Понимание последовательных и параллельных потоковых сплитераторов в Java 8 и Java 9
Вопрос о сплитераторах, на первый взгляд, не простой.
В потоках, .parallel()
изменяет поведение, которое обрабатывает поток Однако я ожидал, что сплитераторы, созданные из последовательных и параллельных потоков, будут одинаковыми. Например, в последовательных потоках обычно .trySplit()
никогда не вызывается, в то время как в параллельных потоках это так, чтобы передать разделенный сплитератор другому потоку.
Различия между stream.spliterator()
против stream.parallel().spliterator()
:
Они могут иметь разные характеристики:
Stream.of(1L, 2L, 3L).limit(2); // ORDERED Stream.of(1L, 2L, 3L).limit(2).parallel(); // SUBSIZED, SIZED, ORDERED
Кажется, здесь обсуждается еще одна политика характеристик сплитератора бессмысленного потока (параллельно рассчитывается лучше): Понимание характеристик глубоко сплитератора в java 8 и java 9
Они могут иметь различное поведение с точки зрения разделения с помощью
.trySplit()
:Stream.of(1L, 2L, 3L); // NON NULL Stream.of(1L, 2L, 3L).limit(2); // NULL Stream.of(1L, 2L, 3L).limit(2).parallel(); // NON NULL
Почему у последних двух разное поведение? Почему я не могу разделить последовательный поток, если хочу? (Может быть полезно отказаться от одного из разделений для быстрой обработки, например).
Большие воздействия при преобразовании сплитераторов в поток:
spliterator = Stream.of(1L, 2L, 3L).limit(2).spliterator(); stream = StreamSupport.stream(spliterator, true); // No parallel processing!
В этом случае сплитератор был создан из последовательного потока, который отключает возможность разделения (.trySplit()
возвращает ноль). Когда позже потребуется преобразовать обратно в поток, этот поток не выиграет от параллельной обработки. Стыд.
Большой вопрос: как обходной путь, каковы основные последствия всегда преобразовывать поток в параллель перед вызовом .spliterator()
?
// Supports activation of parallel processing later
public static <T> Stream<T> myOperation(Stream<T> stream) {
boolean isParallel = stream.isParallel();
Spliterator<T> spliterator = stream.parallel().spliterator();
return StreamSupport.stream(new Spliterator<T>() {
// My implementation of the interface here (omitted for clarity)
}, isParallel).onClose(stream::close);
}
// Now I have the option to use parallel processing when needed:
myOperation(stream).skip(1).parallel()...
1 ответ
Это не общее свойство сплитераторов, а только обертывание сплитераторов, инкапсулирующих потоковый конвейер.
Когда ты звонишь spliterator()
в потоке, который был сгенерирован из сплитератора и не имеет цепочки операций, вы получите сплитератор источника, который может поддерживать или не поддерживать trySplit
независимо от потока parallel
государство.
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "foo", "bar", "baz");
Spliterator<String> sp1 = list.spliterator(), sp2=list.stream().spliterator();
// true
System.out.println(sp1.getClass()==sp2.getClass());
// not null
System.out.println(sp2.trySplit());
также
Spliterator<String> sp = Stream.of("foo", "bar", "baz").spliterator();
// not null
System.out.println(sp.trySplit());
Но как только вы цепочки операций перед вызовом spliterator()
, вы получите сплитератор, оборачивающий конвейер потока. Теперь можно было бы реализовать выделенные сплитераторы, выполняющие связанную операцию, например LimitSpliterator
или MappingSpliterator
, но это не было сделано, поскольку преобразование потока обратно в сплитератор рассматривалось как последнее средство, когда другие операции терминала не подходят, а не в случае использования с высоким приоритетом. Вместо этого вы всегда получите экземпляр единственного класса реализации, который пытается преобразовать внутреннюю работу реализации потокового конвейера в API-интерфейс spliterator.
Это может быть довольно сложным для операций с состоянием, особенно, sorted
, distinct
или же skip
& limit
для не SIZED
поток. Для тривиальных операций без сохранения состояния, таких как map
или же filter
было бы намного проще обеспечить поддержку, как было отмечено в комментарии к коду
Абстрактный упаковочный сплитератор, который связывается с сплитератором конвейерного помощника при первой операции. Этот сплитератор не имеет позднего связывания и будет привязан к исходному сплитератору при первой работе. Сплитератор переноса, созданный из последовательного потока, не может быть разделен, если присутствуют операции с состоянием.
… // @@@ Detect if stateful operations are present or not // If not then can split otherwise cannot /** * True if this spliterator supports splitting */ final boolean isParallel;
но кажется, что в настоящее время это обнаружение не было реализовано, и все промежуточные операции обрабатываются как операции с состоянием.
Spliterator<String> sp = Stream.of("foo", "bar", "baz").map(x -> x).spliterator();
// null
System.out.println(sp.trySplit());
Когда вы пытаетесь обойти это, всегда звоня parallel
, не будет никакого воздействия, когда потоковый конвейер состоит только из операций без сохранения состояния. Но при наличии операции с состоянием это может значительно изменить поведение. Например, когда у вас есть sorted
шаг, все элементы должны быть буферизованы и отсортированы, прежде чем вы сможете использовать первый элемент. Для параллельного потока он, вероятно, будет использовать parallelSort
даже когда ты никогда не вызываешь trySplit
,