Оператор windowAll во Flink уменьшает масштаб параллелизации до 1?

У меня есть поток во Flink, который отправляет кубы из источника, выполняет преобразование в кубе (добавляя 1 к каждому элементу в кубе), а затем отправляет его в нисходящем направлении для печати пропускной способности каждую секунду.

Поток распараллеливается на 4 потока.

Если я правильно понимаю, windowAll оператор является непараллельным преобразованием и поэтому должен уменьшить параллелизацию до 1 и использовать его вместе с TumblingProcessingTimeWindows.of(Time.seconds(1)), суммируйте пропускную способность всех распараллеленных подзадач за последнюю секунду и напечатайте ее. Я не уверен, что получаю правильный вывод, так как пропускная способность каждую секунду печатается так:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...

Вопрос: печатает ли потоковый принтер пропускную способность из каждого потока (1,2,3 и 4), или он выбирает, например, нить 3, для печати суммы пропускной способности всех подзадач?

Когда я устанавливаю параллелизм среды в начале 1env.setParallelism(1)Я не получаю "x> " до пропускной способности, но я, кажется, получаю такую ​​же (или даже лучшую) пропускную способность, как когда она установлена ​​на 4. Вот так:

45
429
499
505
1
503
524
530
...

Вот фрагмент кода программы:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new CubeSource());

        DataStream<Cube> adder = start
                .map(new MapFunction<Cube, Cube>() {
                    @Override
                    public Cube map(Cube cube) throws Exception {
                        return cube.cubeAdd(1);
                    }
                });

        DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow tw,
                                      Iterable<Cube> values,
                                      Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Cube c : values)
                            sum++;
                        out.collect(sum);
                    }
                });
        throughput.print();
        env.execute("Cube Stream of Sweetness");
    }
}

1 ответ

Если для параллелизма среды установлено значение 3, и вы используете оператор WindowAll, только параллелизм запускает только оконный оператор 1. Приемник все еще будет работать с параллелизмом 3. Следовательно, план выглядит следующим образом:

In_1 -\               /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/               \- Out_3

Оператор WindowAll передает свои выходные данные своим последующим задачам, используя стратегию циклического перебора. Это причина того, что разные потоки генерируют записи результатов программы.

Когда вы устанавливаете параллелизм среды на 1, все операторы запускаются с одной задачей.

Другие вопросы по тегам