Почему filter() после flatMap() "не полностью" ленив в потоках Java?

У меня есть следующий пример кода:

System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);
System.out.println("-----------");
System.out.println(
       "Result: " +
        Stream.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .findFirst()
                .get()
);

Вывод следующий:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

Отсюда я вижу, что в первом случае stream действительно ведет себя лениво - мы используем findFirst() поэтому, когда у нас есть первый элемент, наша лямбда-фильтр не вызывается. Тем не менее, во втором случае, который использует flatMaps мы видим, что, несмотря на то, что первый элемент, который удовлетворяет условию фильтра, найден (это просто любой первый элемент, поскольку лямбда всегда возвращает true), дальнейшее содержимое потока все еще подается через функцию фильтрации.

Я пытаюсь понять, почему он ведет себя так, а не сдается после того, как вычисляется первый элемент, как в первом случае. Любая полезная информация будет оценена.

8 ответов

Решение

TL;DR, это было решено в JDK-8075939 и исправлено в Java 10.

При изучении реализации (ReferencePipeline.java) мы видим метод [ ссылка ]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

который будет вызывать для findFirst операция. Особая забота - это sink.cancellationRequested() что позволяет завершить цикл на первом матче. Сравнить с [ ссылка ]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                        // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}

Метод для продвижения одного предмета заканчивается вызовом forEach в подпотоке без возможности досрочного завершения и комментария в начале flatMap Метод даже рассказывает об этой отсутствующей функции.

Поскольку это нечто большее, чем просто оптимизация, поскольку это означает, что код просто ломается, когда подпоток бесконечен, я надеюсь, что разработчики вскоре докажут, что они "могут добиться большего успеха, чем это"…


Чтобы проиллюстрировать последствия, в то время как Stream.iterate(0, i->i+1).findFirst() работает как положено, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() будет в конечном итоге в бесконечном цикле.

Что касается спецификации, большинство из них можно найти в

глава "Потоковые операции и конвейеры" спецификации пакета:

...

Промежуточные операции возвращают новый поток. Они всегда ленивы;

...

… Лень также позволяет избежать проверки всех данных, когда в этом нет необходимости; для таких операций, как "найти первую строку длиной более 1000 символов", необходимо только изучить достаточно строк, чтобы найти ту, которая обладает желаемыми характеристиками, без проверки всех строк, доступных из источника. (Это поведение становится еще более важным, когда входной поток бесконечен, а не просто велик.)

...

Кроме того, некоторые операции считаются операциями с коротким замыканием. Промежуточная операция является коротким замыканием, если, когда она представлена ​​бесконечным вводом, она может привести к конечному потоку в результате. Работа терминала является коротким замыканием, если при наличии бесконечного входа она может завершиться за конечное время. Наличие операции короткого замыкания в конвейере является необходимым, но не достаточным условием для прекращения обработки бесконечного потока в обычном режиме за конечное время.

Понятно, что операция с коротким замыканием не гарантирует конечное завершение времени, например, когда фильтр не соответствует ни одному элементу, обработка не может быть завершена, но реализация, которая не поддерживает никакого завершения за конечное время, просто игнорируя короткозамкнутый характер операции далек от спецификации.

Элементы входного потока лениво расходуются один за другим. Первый элемент, 1трансформируется двумя flatMapв поток -1, 0, 1, 0, 1, 2, 1, 2, 3, так что весь поток соответствует только первому элементу ввода. Вложенные потоки охотно материализуются трубопроводом, затем сплющиваются, а затем подаются в filter этап. Это объясняет ваш вывод.

Вышесказанное не вытекает из фундаментального ограничения, но, вероятно, будет намного сложнее получить полноценную лень для вложенных потоков. Я подозреваю, что сделать его еще более сложной задачей было бы еще сложнее. Для сравнения, ленивые seqs Clojure получают еще один слой для каждого такого уровня вложенности. Из-за этой конструкции операции могут даже потерпеть неудачу с StackruError когда вложение осуществляется до крайности.

Что касается разрыва с бесконечными подпотоками, то поведение flatMap становится еще более удивительным, когда в промежуточную (в отличие от терминальной) работу входит короткое замыкание.

Хотя следующее работает, как ожидалось, распечатывая бесконечную последовательность целых чисел

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

следующий код выводит только "1", но все равно не завершается:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

Я не могу представить себе чтение спецификации, в которой это не было ошибкой.

В моей бесплатной библиотеке StreamEx я представил короткозамкнутые коллекторы. При сборе последовательного потока с короткозамкнутым коллектором (типа MoreCollectors.first()) ровно один элемент потребляется из источника. Внутренне это реализовано довольно грязным способом: использование пользовательского исключения для прерывания потока управления. Используя мою библиотеку, ваш образец может быть переписан следующим образом:

System.out.println(
        "Result: " +
                StreamEx.of(1, 2, 3)
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .flatMap(i -> Stream.of(i - 1, i, i + 1))
                .filter(i -> {
                    System.out.println(i);
                    return true;
                })
                .collect(MoreCollectors.first())
                .get()
        );

Результат следующий:

-1
Result: -1

Хотя JDK-8075939 был исправлен в Java 11 и перенесен на 10 и 8u222, все еще существует крайний случай flatMap() не лениться при использовании Stream.iterator(): JDK-8267359, все еще присутствует в Java 17.

Этот

      Iterator<Integer> it =
    Stream.of("a", "b")
        .flatMap(s -> Stream
            .of(1, 2, 3, 4)
            .filter(i -> { System.out.println(i); return true; }))
        .iterator();

it.hasNext(); // This consumes the entire flatmapped stream
it.next();

Печать

      1
2
3
4

Пока это:

      Iterator<Integer> it =
    Stream.of("a", "b")
        .flatMap(s -> Stream
            .iterate(1, i -> i)
            .filter(i -> { System.out.println(i); return true; }))
        .iterator();

it.hasNext();
it.next();

Никогда не прекращается

К несчастью .flatMap() не ленивый Тем не менее, обычай flatMap Обходной путь доступен здесь: Почему.flatMap() настолько неэффективен (не ленив) в Java 8 и Java 9

Сегодня тоже наткнулся на этот баг. Поведение не так прямолинейно, потому что простой случай, как показано ниже, работает нормально, но аналогичный производственный код не работает.

 stream(spliterator).map(o -> o).flatMap(Stream::of).flatMap(Stream::of).findAny()

Для парней, которые не могут ждать еще пару лет перехода на JDK-10, есть альтернативный истинный ленивый поток. Он не поддерживает параллель. Он был предназначен для перевода JavaScript, но у меня получилось, потому что интерфейс такой же.

StreamHelper основан на коллекциях, но Spliterator легко адаптировать.

https://github.com/yaitskov/j4ts/blob/stream/src/main/java/javaemul/internal/stream/StreamHelper.java

Я согласен с другими людьми, это ошибка, открытая в JDK-8075939. И так как это все еще не исправлено больше чем через год. Я хотел бы порекомендовать вам: AbacusUtil

N.println("Result: " + Stream.of(1, 2, 3).peek(N::println).first().get());

N.println("-----------");

N.println("Result: " + Stream.of(1, 2, 3)
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .peek(N::println).first().get());

// output:
// 1
// Result: 1
// -----------
// -1
// Result: -1

Раскрытие информации: я разработчик AbacusUtil.

Другие вопросы по тегам