Почему этот простой тест Flink иногда терпит неудачу?

Я уверен, что это должно быть проблемой Flink, потому что протестированный код действительно прост.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// I don't need this for this particular example, but I use it in other place in my code.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

SingleOutputStreamOperator<String> linesSource = env.readTextFile(inputFile).setParallelism(1);

SingleOutputStreamOperator<PositionEvent> mappedlines = linesSource.map(new Tokenizer());

SpeedRadar.run(mappedlines)
          .writeAsCsv(String.format("%s/%s", outputFolder, SPEED_RADAR_FILE));

Где SpeedRadar класс это:

public final class SpeedRadar {

    private static final int MAXIMUM_SPEED = 90;

    public static SingleOutputStreamOperator<SpeedEvent> run(SingleOutputStreamOperator<PositionEvent> stream) {
        return stream
                .filter((PositionEvent e) -> e.f2 > MAXIMUM_SPEED)
                .map(new ToSpeedEvent());
    }

Я не думаю, что важно показать вам POJO и некоторые другие классы, которые отсутствуют. Дело в том, что я читаю строки из CSV-файла, как этот: 130,1,65,0,3,0,49,100000 и я фильтрую строки, третье поле которых больше 90.

Это мой простой контрольный пример:

public class SpeedRadarTests extends StreamingMultipleProgramsTestBase {

    private StreamExecutionEnvironment env;

    @Before
    public void createEnv() {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SpeedEventSink.values.clear();
    }

    @Test
    public void shouldDetectTwoOverSpeedEvents() throws Exception {

        String[] data = new String[]{
                "30,1,91,1,3,0,10,100000",
                "60,2,90,2,2,1,20,200000",
                "90,3,99,3,1,0,30,300000"
        };

        SingleOutputStreamOperator<PositionEvent> source
                = new PositionStreamBuilder(env).fromLines(data).build();

        SpeedRadar.run(source).addSink(new SpeedEventSink());
        env.execute();

        Map<String, SpeedEvent> events = SpeedEventSink.values;
        assertEquals(2, events.size());

    private static class SpeedEventSink implements SinkFunction<SpeedEvent> {

        static final Map<String, SpeedEvent> values = new HashMap<>();

        @Override
        public synchronized void invoke(SpeedEvent speedEvent) throws Exception {
            // I'm sure f1 is unique
            values.put(speedEvent.f1, speedEvent);
        }
    }

}

И вот как я создаю свой "тестовый поток":

public class PositionStreamBuilder {

    private StreamExecutionEnvironment env;
    private SingleOutputStreamOperator<PositionEvent> stream;

    public PositionStreamBuilder(StreamExecutionEnvironment env) {
        this.env = env;
    }

    public PositionStreamBuilder fromLines(String[] lines) {
        stream = env.fromElements(lines)
                .setParallelism(1)
                .map(new VehicleTelematics.Tokenizer());  // the same Tokenizer as before
        return this;
    }

    // more methods here

    public SingleOutputStreamOperator<PositionEvent> build() {
        return stream;
    }

}

Дело в том, что иногда я не знаю, почему утверждение не выполняется, потому что Map имеет только один элемент. Я выполнил шаги, описанные в документации Flink, с той лишь разницей, что я не устанавливаю параллелизм на 1 (но в любом случае, в этом тесте это не должно иметь значения).

Дело в том, что не только этот тест терпит неудачу, иногда другие, которые не должны проваливаться, терпят неудачу. Например, Флинк иногда пропускает одно событие.

Когда я запускаю код с помощью flink run Я никогда не пропускал элемент.

0 ответов

Другие вопросы по тегам