Интеграционный тест для сложной топологии (несколько входов) в Flink

Мне нужно написать модульный тест для топологии потоковой передачи Flink. Это в основном CoFlatMapFunctionи имеет 2 входа.

Я пытаюсь получить вдохновение от этой страницы: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Порядок входов имеет значение для моей топологии, поэтому при тестировании я не могу использовать StreamExecutionEnvironment#fromCollection для каждого входа, так как я не буду контролировать порядок, в котором точки данных вводятся в каждый вход.

Я пытался создать один вход с помощью StreamExecutionEnvironment#fromCollection и отправить каждый элемент к фактическому вводу моего CoFlatMapFunction на основе их типа, но порядок элементов теряется в этой операции.

Есть ли другой способ написать этот тест?

2 ответа

Решение

В тренировочных упражнениях Flink есть пример использования TwoInputStreamOperatorTestHarness, к которому вы можете обратиться:

https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

Вам понадобятся эти зависимости:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-all</artifactId>
  <version>1.10.19</version>
  <type>jar</type>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

Следует помнить, что это не общедоступный поддерживаемый интерфейс, поэтому он может развиваться неожиданным образом.

Вы хотите использовать TwoInputStreamOperatorTestHarness учебный класс. К сожалению, документация немного скудна. У меня есть тест, использующий этот класс, но он еще не отправлен в ветку 133_stream-test-harness flink-crawler.

Другие вопросы по тегам