Почему этот простой тест Flink иногда терпит неудачу?
Я уверен, что это должно быть проблемой Flink, потому что протестированный код действительно прост.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// I don't need this for this particular example, but I use it in other place in my code.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<String> linesSource = env.readTextFile(inputFile).setParallelism(1);
SingleOutputStreamOperator<PositionEvent> mappedlines = linesSource.map(new Tokenizer());
SpeedRadar.run(mappedlines)
.writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE));
Где SpeedRadar
класс это:
public final class SpeedRadar {
private static final int MAXIMUM_SPEED = 90;
public static SingleOutputStreamOperator<SpeedEvent> run(SingleOutputStreamOperator<PositionEvent> stream) {
return stream
.filter((PositionEvent e) -> e.f2 > MAXIMUM_SPEED)
.map(new ToSpeedEvent());
}
Я не думаю, что важно показать вам POJO и некоторые другие классы, которые отсутствуют. Дело в том, что я читаю строки из CSV-файла, как этот: 130,1,65,0,3,0,49,100000
и я фильтрую строки, третье поле которых больше 90.
Это мой простой контрольный пример:
public class SpeedRadarTests extends StreamingMultipleProgramsTestBase {
private StreamExecutionEnvironment env;
@Before
public void createEnv() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SpeedEventSink.values.clear();
}
@Test
public void shouldDetectTwoOverSpeedEvents() throws Exception {
String[] data = new String[]{
"30,1,91,1,3,0,10,100000",
"60,2,90,2,2,1,20,200000",
"90,3,99,3,1,0,30,300000"
};
SingleOutputStreamOperator<PositionEvent> source
= new PositionStreamBuilder(env).fromLines(data).build();
SpeedRadar.run(source).addSink(new SpeedEventSink());
env.execute();
Map<String, SpeedEvent> events = SpeedEventSink.values;
assertEquals(2, events.size());
private static class SpeedEventSink implements SinkFunction<SpeedEvent> {
static final Map<String, SpeedEvent> values = new HashMap<>();
@Override
public synchronized void invoke(SpeedEvent speedEvent) throws Exception {
// I'm sure f1 is unique
values.put(speedEvent.f1, speedEvent);
}
}
}
И вот как я создаю свой "тестовый поток":
public class PositionStreamBuilder {
private StreamExecutionEnvironment env;
private SingleOutputStreamOperator<PositionEvent> stream;
public PositionStreamBuilder(StreamExecutionEnvironment env) {
this.env = env;
}
public PositionStreamBuilder fromLines(String[] lines) {
stream = env.fromElements(lines)
.setParallelism(1)
.map(new VehicleTelematics.Tokenizer()); // the same Tokenizer as before
return this;
}
// more methods here
public SingleOutputStreamOperator<PositionEvent> build() {
return stream;
}
}
Дело в том, что иногда я не знаю, почему утверждение не выполняется, потому что Map
имеет только один элемент. Я выполнил шаги, описанные в документации Flink, с той лишь разницей, что я не устанавливаю параллелизм на 1 (но в любом случае, в этом тесте это не должно иметь значения).
Дело в том, что не только этот тест терпит неудачу, иногда другие, которые не должны проваливаться, терпят неудачу. Например, Флинк иногда пропускает одно событие.
Когда я запускаю код с помощью flink run
Я никогда не пропускал элемент.