Параллельный поток вызывает Spliterator больше раз, чем его предел
Я недавно обнаружил ошибку, в которой
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.limit(20)
вызывал Spliterator.ofInt.tryAdvance
более 20 раз. Когда я изменил это на
StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.sequential()
.limit(20)
проблема ушла. Почему это происходит? Есть ли способ добиться строгого ограничения параллельного потока, когда tryAdvance
имеет побочные эффекты, кроме встраивания одного в Spliterator
? (Это для тестирования некоторых методов, которые возвращают неограниченное количество потоков, но где тесты должны достигнуть конечного конца без усложнения конструкции "цикл за X миллисекунд".)
3 ответа
Кажется, есть фундаментальное недоразумение о том, как limit
а также trySplit
должен взаимодействовать. Предположение, что не должно быть больше trySplit
вызовы, чем указано limit
, совершенно неправильно.
Цель trySplit
состоит в том, чтобы разделить исходные данные на две части, на две половины в лучшем случае, как trySplit
должен попытаться сбалансированного раскола. Таким образом, если у вас есть исходный набор данных из одного миллиона элементов, успешное разделение даст два исходных набора данных по полмиллиона элементов в каждом. Это совершенно не связано с limit(20)
вы могли бы применить к потоку, за исключением того, что мы знаем заранее, что мы можем отбросить второй набор данных, если сплитератор имеет SIZED|SUBSIZED
характеристики, так как запрошенные первые двадцать элементов могут быть найдены только в пределах первой половины миллиона.
Нетрудно подсчитать, что в лучшем случае, то есть при сбалансированном разбиении, нам нужно уже пятнадцать операций разделения, каждый раз отбрасывая верхнюю половину, прежде чем мы получим разделение между первыми двадцатью элементами, что позволит нам обработать эти первые двадцать элементы параллельно.
Что можно легко продемонстрировать:
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
int current, fence;
DebugSpliterator() {
this(0, 1_000_000);
}
DebugSpliterator(int start, int end) {
super(end-start, ORDERED|SIZED|SUBSIZED);
current = start;
fence = end;
}
@Override public boolean tryAdvance(IntConsumer action) {
if(current<fence) {
action.accept(current++);
return true;
}
return false;
}
@Override public OfInt trySplit() {
int mid = (current+fence)>>>1;
System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
return mid>current? new DebugSpliterator(current, current=mid): null;
}
}
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});
На моей машине он печатает:
trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]
что, конечно, гораздо больше, чем двадцать попыток разделения, но вполне разумно, так как набор данных должен быть разбит до тех пор, пока у нас не будет поддиапазонов в пределах желаемого целевого диапазона, чтобы иметь возможность обрабатывать его параллельно.
Мы можем применить другое поведение, отбросив метаинформацию, которая приводит к этой стратегии выполнения:
StreamSupport.stream(new DebugSpliterator(), true)
.filter(x -> true)
.limit(20)
.forEach(x -> {});
Поскольку Stream API не знает о поведении предиката, конвейер теряет SIZED
характеристика, ведущая к
trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]
который показывает меньше trySplit
вызовы, но не улучшение; рассмотрение чисел показывает, что теперь диапазоны за пределами результирующего диапазона элементов (если мы используем наши знания о том, что все элементы будут проходить через файлер) обрабатываются, что еще хуже, диапазон результирующих элементов полностью покрывается одним сплитератором, в результате чего нет параллели обрабатывая наши элементы результата вообще, все остальные потоки обрабатывали элементы, которые впоследствии отбрасывались.
Конечно, мы могли бы легко обеспечить оптимальное разделение для нашей задачи, изменив
int mid = (current+fence)>>>1;
в
int mid = fence>20? 20: (current+fence)>>>1;
так
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});
результаты в
trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]
но это был бы не сплитератор общего назначения, а тот, который работает плохо, если предел не двадцать.
Если мы сможем включить ограничение в сплитератор или, в более общем случае, в источник потока, у нас не будет этой проблемы. Так что вместо list.stream().limit(x)
Вы можете позвонить list.subList(0, Math.min(x, list.size())).stream()
, вместо random.ints().limit(x)
использовать random.ints(x)
, вместо Stream.generate(generator).limit(x)
вы можете использовать LongStream.range(0, x).mapToObj( index -> generator.get())
или используйте фабричный метод этого ответа.
Для произвольного потокового источника / сплитератора, применяя limit
может быть довольно дорого для параллельных потоков, что даже задокументировано. Ну и имеющие побочные эффекты в trySplit
это плохая идея в первую очередь.
Я не думаю, что это ошибка в любом случае, но все же очень интересная идея, что tryAdvance
может иметь побочные эффекты.
Это было бы вполне возможно, насколько я понимаю, для случая, когда ваш trySplit
не разбивает на отдельные элементы пакета.
Например, у вас есть массив, и вы хотите разделить его (через trySplit
) на части подмассивов не менее 4-х элементов в каждом. В таком случае, когда вы не можете больше разделить (вы достигли минимум 4 элементов в текущем Spliterator
например), когда начнется обработка - forEachRemaning
будет называться; в свою очередь это будет по умолчанию для вызова tryAdvance
для каждого элемента в текущем Spliterator
, как видно из реализации по умолчанию:
default void forEachRemaining(Consumer<? super T> action) {
do { } while (tryAdvance(action));
}
Очевидно, так как вы выполняете работу параллельно - как только поток начинает свою работу (читайте executing it's forEachRemaning
), это не может быть остановлено больше - так много элементов ударит tryAdvance
,
Таким образом, я действительно не думаю, что есть способ сделать это, кроме интеграции этого в Spliterator
сам; Я думаю, что это должно работать:
static class LimitingSpliterator<T> implements Spliterator<T> {
private int limit;
private final Supplier<T> generator;
private LimitingSpliterator(Supplier<T> generator, int limit) {
this.limit = limit;
this.generator = generator;
}
static <T> LimitingSpliterator<T> of(Supplier<T> supplier, int limit) {
return new LimitingSpliterator<>(supplier, limit);
}
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
Objects.requireNonNull(consumer);
if (limit > 0) {
--limit;
generator.get();
consumer.accept(generator.get());
return true;
}
return false;
}
@Override
public void forEachRemaining(final Consumer<? super T> consumer) {
while (limit > 0) {
consumer.accept(generator.get());
--limit;
}
}
@Override
public LimitingSpliterator<T> trySplit() {
int half = limit >> 2;
limit = limit - half;
return new LimitingSpliterator<>(generator, half);
}
@Override
public long estimateSize() {
return limit << 2;
}
@Override
public int characteristics() {
return SIZED;
}
}
Для моего варианта использования решение было использовать:
LongStream.range(0, streamSize).unordered().parallel().mapToInt(ignored -> nextInt())
NB: Это был поток случайных чисел из PRNG, которые могли бы постоянно повторяться.