Поведение Stream.skip с неупорядоченной работой терминала

Я уже читал этот и этот вопросы, но все еще сомневаюсь, наблюдается ли наблюдаемое поведение Stream.skip было задумано авторами JDK.

Давайте просто введем числа 1..20:

List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());

Теперь давайте создадим параллельный поток, объединим unordered() с skip() по-разному и собираем результат:

System.out.println("skip-skip-unordered-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .skip(1)
            .unordered()
            .collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .skip(1)
            .collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .unordered()
            .skip(1)
            .skip(1)
            .collect(Collectors.toList()));

Этап фильтрации здесь по сути ничего не делает, но добавляет дополнительные трудности для потокового движка: теперь он не знает точный размер вывода, поэтому некоторые оптимизации отключены. У меня есть следующие результаты:

skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20]
// absent values: 1, 15
unordered-skip-skip-toList: [1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20]
// absent values: 7, 18

Результаты полностью в порядке, все работает как положено. В первом случае я попросил пропустить первые два элемента, а затем собрать их в списке в произвольном порядке. Во втором случае я попросил пропустить первый элемент, затем превратить в неупорядоченный и пропустить еще один элемент (мне все равно, какой именно). В третьем случае я сначала переключился в неупорядоченный режим, затем пропустил два произвольных элемента.

Давайте пропустим один элемент и соберем его в пользовательскую коллекцию в неупорядоченном режиме. Наша коллекция будет HashSet:

System.out.println("skip-toCollection: "
        + input.parallelStream().filter(x -> x > 0)
        .skip(1)
        .unordered()
        .collect(Collectors.toCollection(HashSet::new)));

Выход удовлетворительный:

skip-toCollection: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// 1 is skipped

В общем, я ожидаю, что пока поток упорядочен, skip() пропускает первые элементы, иначе пропускает произвольные.

Однако давайте использовать эквивалентную неупорядоченную операцию терминала collect(Collectors.toSet()):

System.out.println("skip-toSet: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .collect(Collectors.toSet()));

Теперь вывод:

skip-toSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20]
// 13 is skipped

Тот же результат может быть достигнут с любой другой неупорядоченной работой терминала (например, forEach, findAny, anyMatch, так далее.). Удаление unordered() шаг в этом случае ничего не меняет. Кажется что пока unordered() шаг правильно делает поток неупорядоченным, начиная с текущей операции, операция неупорядоченного терминала делает весь поток неупорядоченным, начиная с самого начала, несмотря на то, что это может повлиять на результат, если skip() использовался. Мне кажется, что это вводит в заблуждение: я ожидаю, что использование неупорядоченного коллектора - это то же самое, что переключение потока в неупорядоченный режим непосредственно перед работой терминала и использование эквивалентного упорядоченного коллектора.

Итак, мои вопросы:

  1. Это поведение предназначено или это ошибка?
  2. Если да, это где-то задокументировано? Я прочитал документацию Stream.skip(): он ничего не говорит о неупорядоченных терминальных операциях. Также документация Characteristics.UNORDERED не очень понятна и не говорит о том, что порядок будет потерян для всего потока. Наконец, раздел " Заказ " в сводке пакета также не охватывает этот случай. Возможно, я что-то упустил?
  3. Если предполагается, что неупорядоченная работа терминала делает весь поток неупорядоченным, почему unordered() шаг делает его неупорядоченным только с этого момента? Могу ли я положиться на это поведение? Или мне просто повезло, что мои первые тесты работают хорошо?

2 ответа

Решение

Напомним, что целью флагов потока (ORDERED, SORTED, SIZED, DISTINCT) является включение операций во избежание ненужной работы. Примерами оптимизации, включающей флаги потока, являются:

  • Если мы знаем, что поток уже отсортирован, то sorted() не работает;
  • Если мы знаем размер потока, мы можем предварительно выделить массив правильного размера в toArray()избегая копирования;
  • Если мы знаем, что входные данные не имеют значимого порядка встречи, нам не нужно предпринимать дополнительные шаги для сохранения порядка встречи.

Каждый этап конвейера имеет набор флагов потока. Промежуточные операции могут вводить, сохранять или очищать флаги потока. Например, фильтрация сохраняет отсортированность / отличность, но не размерность; отображение сохраняет размерность, но не сортированность или отличимость. Сортировка внедряет сортировку. Обработка флагов для промежуточных операций довольно проста, потому что все решения являются локальными.

Обработка флагов для терминальных операций более тонкая. ORDERED - наиболее подходящий флаг для терминальных операций. И если терминальная операция НЕПРАВИЛЬНА, то мы делаем обратное распространение неупорядоченности.

Почему мы это делаем? Хорошо, рассмотрим этот конвейер:

set.stream()
   .sorted()
   .forEach(System.out::println);

поскольку forEach не ограничен в работе по порядку, работа по сортировке списка полностью напрасна. Таким образом, мы распространяем эту информацию обратно (пока не столкнемся с операцией короткого замыкания, такой как limit), чтобы не потерять эту возможность оптимизации. Точно так же мы можем использовать оптимизированную реализацию distinct на неупорядоченных потоках.

Это поведение предназначено или это ошибка?

Да:) Предполагается обратное распространение, поскольку это полезная оптимизация, которая не должна давать неправильных результатов. Тем не менее, часть ошибки заключается в том, что мы распространяемся мимо предыдущего skipчто мы не должны. Так что обратное распространение флага UNORDERED слишком агрессивно, и это ошибка. Мы опубликуем ошибку.

Если да, это где-то задокументировано?

Это должна быть просто деталь реализации; если бы это было правильно реализовано, вы бы не заметили (за исключением того, что ваши потоки быстрее.)

@Ruben, ты наверное не понимаешь мой вопрос. Грубо говоря, проблема в том, почему unordered(). Collect(toCollection(HashSet::new)) ведет себя иначе, чем collect(toSet()). Конечно, я знаю, что toSet () неупорядочен.

Возможно, но, во всяком случае, я дам ему вторую попытку.

Взглянув на Javadocs Collectors toSet и toCollection, мы видим, что toSet предоставляет неупорядоченный коллектор

Это {@link Collector.Characteristics#UNORDERED unordered} коллектор.

то есть, CollectorImpl с НЕПРАВИЛЬНОЙ характеристикой. Взглянув на Javadoc Collector.Characteristics # UNORDERED мы можем прочитать:

Указывает, что операция сбора не обязана сохранять порядок встречи элементов ввода

В Javadocs Collector мы также можем увидеть:

Для одновременных коллекторов реализация свободна (но не обязана) реализовывать сокращение одновременно. Параллельное сокращение - это такое, когда функция-накопитель вызывается одновременно из нескольких потоков, используя один и тот же одновременно изменяемый контейнер результатов, вместо того, чтобы сохранять результат изолированным во время накопления. Одновременное сокращение должно применяться только в том случае, если коллектор имеет характеристики {@link Characteristics#UNORDERED} или исходные данные неупорядочены

Для меня это означает, что, если мы устанавливаем характеристику UNORDERED, нам совершенно наплевать на порядок, в котором элементы потока передаются в аккумулятор, и, следовательно, элементы могут быть извлечены из конвейера в любом порядке.,

Кстати, вы получаете то же самое поведение, если вы опустите unordered () в вашем примере:

    System.out.println("skip-toSet: "
            + input.parallelStream().filter(x -> x > 0)
                .skip(1)
                .collect(Collectors.toSet()));

Кроме того, метод skip() в Stream дает нам подсказку:

Хотя {@code skip()} обычно является дешевой операцией для последовательных потоковых конвейеров, она может быть довольно дорогой для упорядоченных параллельных конвейеров

а также

Использование неупорядоченного источника потока (такого как {@link #generate(Supplier)}) или удаление ограничения упорядочения с помощью {@link #unordered()} может привести к значительному ускорению

Когда используешь

Collectors.toCollection(HashSet::new)

вы создаете нормальный "упорядоченный" коллектор (один без характеристики UNORDERED), что для меня означает, что вы заботитесь о порядке, и, следовательно, элементы извлекаются по порядку, и вы получаете ожидаемое поведение.

Другие вопросы по тегам