Стохастический отбор Apache Flink для потока данных

Я пытаюсь использовать модель StochasticOutlierSelection пакета Apache Flink ML.

Я не могу понять, как использовать его с Kafka в качестве источника данных, я понимаю, что для этого нужен DataSet, а не DataStream, но я, похоже, не могу открыть окно Kafka DataStream, чтобы стать DataSet.

Есть ли способ, которым я могу рассматривать мой поток как серию небольших наборов данных. Например, есть ли способ сказать, что каждые 10 элементов в потоке, которые соответствуют шаблону (скользящее окно по уникальному идентификатору элементов), рассматривают их как набор данных фиксированного размера и обнаруживают любые выбросы в этом наборе данных фиксированного размера?

Сценарий, который я хочу создать:

источник данных -> Тема 1 Кафки -> Предварительная обработка Flink -> Тема 2 Кафки -> Группы Flink по идентификатору -> Обнаружение выбросов по группам

У меня уже есть рабочая реализация до предварительной обработки, и я надеюсь, что Flink сможет удовлетворить мои требования?

0 ответов

Я предполагаю, что вы могли бы создать глобальное окно на основе счетчика и использовать ExecutionEnvironment для получения DataSet. Что-то вроде следующего может работать (getResult вернет DataSet):


      stream.
      keyBy(...).
      window(GlobalWindows.create).
      trigger(CountTrigger.of(10)).
      aggregate(new MyAggregator()).
      ...

    class MyAggregator extends AggregateFunction[..., ..., ...] {  

      var valueList: List[LabeledVector] = List[LabeledVector]()    

      override def createAccumulator(): MyAggregator = new MyAggregator()
      override def add(value: .., accumulator: MyAggregator): ... = ...
      override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
      override def getResult(accumulator: MyAggregator): ... = {
        ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
      }
    }
Другие вопросы по тегам