Понимание последовательных и параллельных потоковых сплитераторов в Java 8 и Java 9

Вопрос о сплитераторах, на первый взгляд, не простой.

В потоках, .parallel() изменяет поведение, которое обрабатывает поток Однако я ожидал, что сплитераторы, созданные из последовательных и параллельных потоков, будут одинаковыми. Например, в последовательных потоках обычно .trySplit() никогда не вызывается, в то время как в параллельных потоках это так, чтобы передать разделенный сплитератор другому потоку.

Различия между stream.spliterator() против stream.parallel().spliterator():

  1. Они могут иметь разные характеристики:

    Stream.of(1L, 2L, 3L).limit(2);            // ORDERED
    Stream.of(1L, 2L, 3L).limit(2).parallel(); // SUBSIZED, SIZED, ORDERED
    

Кажется, здесь обсуждается еще одна политика характеристик сплитератора бессмысленного потока (параллельно рассчитывается лучше): Понимание характеристик глубоко сплитератора в java 8 и java 9

  1. Они могут иметь различное поведение с точки зрения разделения с помощью .trySplit():

    Stream.of(1L, 2L, 3L);                     // NON NULL
    Stream.of(1L, 2L, 3L).limit(2);            // NULL
    Stream.of(1L, 2L, 3L).limit(2).parallel(); // NON NULL
    

Почему у последних двух разное поведение? Почему я не могу разделить последовательный поток, если хочу? (Может быть полезно отказаться от одного из разделений для быстрой обработки, например).

  1. Большие воздействия при преобразовании сплитераторов в поток:

    spliterator = Stream.of(1L, 2L, 3L).limit(2).spliterator();
    stream = StreamSupport.stream(spliterator, true); // No parallel processing!
    

В этом случае сплитератор был создан из последовательного потока, который отключает возможность разделения (.trySplit() возвращает ноль). Когда позже потребуется преобразовать обратно в поток, этот поток не выиграет от параллельной обработки. Стыд.

Большой вопрос: как обходной путь, каковы основные последствия всегда преобразовывать поток в параллель перед вызовом .spliterator()?

// Supports activation of parallel processing later
public static <T> Stream<T> myOperation(Stream<T> stream) {
    boolean isParallel = stream.isParallel();
    Spliterator<T> spliterator = stream.parallel().spliterator();
    return StreamSupport.stream(new Spliterator<T>() {
        // My implementation of the interface here (omitted for clarity)
    }, isParallel).onClose(stream::close);
}

// Now I have the option to use parallel processing when needed:
myOperation(stream).skip(1).parallel()...

1 ответ

Это не общее свойство сплитераторов, а только обертывание сплитераторов, инкапсулирующих потоковый конвейер.

Когда ты звонишь spliterator() в потоке, который был сгенерирован из сплитератора и не имеет цепочки операций, вы получите сплитератор источника, который может поддерживать или не поддерживать trySplit независимо от потока parallel государство.

ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "foo", "bar", "baz");
Spliterator<String> sp1 = list.spliterator(), sp2=list.stream().spliterator();
// true
System.out.println(sp1.getClass()==sp2.getClass());
// not null
System.out.println(sp2.trySplit());

также

Spliterator<String> sp = Stream.of("foo", "bar", "baz").spliterator();
// not null
System.out.println(sp.trySplit());

Но как только вы цепочки операций перед вызовом spliterator(), вы получите сплитератор, оборачивающий конвейер потока. Теперь можно было бы реализовать выделенные сплитераторы, выполняющие связанную операцию, например LimitSpliterator или MappingSpliterator, но это не было сделано, поскольку преобразование потока обратно в сплитератор рассматривалось как последнее средство, когда другие операции терминала не подходят, а не в случае использования с высоким приоритетом. Вместо этого вы всегда получите экземпляр единственного класса реализации, который пытается преобразовать внутреннюю работу реализации потокового конвейера в API-интерфейс spliterator.

Это может быть довольно сложным для операций с состоянием, особенно, sorted, distinct или же skip & limit для не SIZED поток. Для тривиальных операций без сохранения состояния, таких как map или же filter было бы намного проще обеспечить поддержку, как было отмечено в комментарии к коду

Абстрактный упаковочный сплитератор, который связывается с сплитератором конвейерного помощника при первой операции. Этот сплитератор не имеет позднего связывания и будет привязан к исходному сплитератору при первой работе. Сплитератор переноса, созданный из последовательного потока, не может быть разделен, если присутствуют операции с состоянием.

…

   // @@@ Detect if stateful operations are present or not
   //     If not then can split otherwise cannot

   /**
    * True if this spliterator supports splitting
    */
   final boolean isParallel;

но кажется, что в настоящее время это обнаружение не было реализовано, и все промежуточные операции обрабатываются как операции с состоянием.

Spliterator<String> sp = Stream.of("foo", "bar", "baz").map(x -> x).spliterator();
// null
System.out.println(sp.trySplit());

Когда вы пытаетесь обойти это, всегда звоня parallel, не будет никакого воздействия, когда потоковый конвейер состоит только из операций без сохранения состояния. Но при наличии операции с состоянием это может значительно изменить поведение. Например, когда у вас есть sorted шаг, все элементы должны быть буферизованы и отсортированы, прежде чем вы сможете использовать первый элемент. Для параллельного потока он, вероятно, будет использовать parallelSort даже когда ты никогда не вызываешь trySplit,

Другие вопросы по тегам