Flink Streaming: поток данных, который контролируется потоком управления

У меня есть вопрос, который является вариацией этого вопроса: Flink: как сохранить состояние и использовать в другом потоке?

У меня есть два потока:

  1. val ipStream: DataStream[IPAddress] = ???
  2. val routeStream: DataStream[RoutingTable] = ???

Я хочу узнать, какой маршрут использует пакет. Обычно это можно сделать с помощью:

val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"

Проблема здесь в том, что я не могу на самом деле ввести ключ здесь, так как для этого требуется как полная таблица, так и IP-адрес (и ключи должны быть вычислены изолированно).

Для каждого элемента из ipStreamМне нужна последняя routeStream элемент. Прямо сейчас я использую взломать, что все это обрабатывается не параллельно:

ipStream
  .connect(routeStream)
  .keyBy(_ => 0, _ => 0)
  .flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]

Это звучит как вариант использования для стратегии вещания. Однако routeStream будет обновлен и не будет зафиксирован в файле. Остается вопрос: есть ли способ иметь два потока, один из которых содержит изменяющиеся управляющие данные для другого потока?

1 ответ

Решение

Поскольку я решил проблему, я мог бы написать здесь ответ:)

Я набрал два потока следующим образом:

  1. Поток RoutingTable был введен с использованием первого байта сетевого маршрута.
  2. IP-адрес был также введен первым байтом адреса

Это работает при условии, что IP-пакеты обычно маршрутизируются в сети с одинаковым префиксом /8, что можно предположить для большей части трафика.

Затем, имея состояние RichCoFlatMap можно построить состояние таблицы маршрутизации как ключ. При получении нового IP-пакета выполните поиск в таблице маршрутизации. Теперь есть два возможных сценария:

  1. Не найдено ни одного подходящего маршрута. Мы могли бы хранить пакет на потом здесь, но отказ от него работает также.
  2. Если маршрут был найден, выведите кортеж [IPAddress, RoutingTableEntry].

Таким образом, у нас есть два потока, один из которых имеет изменяемые управляющие данные для другого потока.

Другие вопросы по тегам