Почему.flatMap() так неэффективен (не ленив)?
После первого вопроса о понимании глубоко сплитераторов Java-потоков здесь, еще один тонкий вопрос о потоках: почему реализация .flatMap()
в Java так неэффективно (не ленивый)?
Обычно потоки должны быть как можно более ленивыми, но .flatMap()
метода нет.
Например:
stream.flatMap(this::getStreamWith10HeavyComputationElems).firstFirst()
будет потреблять 10 элементов (10 тяжелых вычислений) перед возвратом первого тяжелого результата вычислений.
stream.flatMap(this::getStreamWith10HeavyComputationElems).limit(11).count()
будет потреблять 20 элементов (2x10 тяжелых вычислений) перед возвратом 11.
Вопрос в том, почему Java использует не ленивую реализацию?
@Test
void flatMap_native() throws Exception {
AtomicInteger count = new AtomicInteger();
Stream<Long> stream = LongStream.range(0, 5).boxed()
.flatMap(num -> LongStream.range(0, 10).boxed()
.peek(x -> count.incrementAndGet()))
.limit(11);
assertThat(stream).hasSize(11);
assertThat(count).hasValue(20); //!why? - should be 11!
}
В качестве обходного пути я создал собственную реализацию flatMap, но ему не хватает беглости по сравнению с нативным вызовом: flatMap(stream, mapper)
против родного stream.flatMap(mapper)
,
public static <T, R> Stream<R> flatMap(Stream<? extends T> stream, Function<? super T, ? extends Stream<? extends R>> mapper) {
// Outside the class to be able to close it, starts with stream.empty
AtomicReference<Stream<? extends R>> flatMapStreamRef = new AtomicReference<>(Stream.empty());
// Defining a better spliterator than the native flatMap one.
class FlatMapSpliterator implements Spliterator<R> {
private final AtomicReference<T> item = new AtomicReference<>();
private final Spliterator<? extends T> spliterator;
private Stream<? extends R> flatMapStream = flatMapStreamRef.get();
private Spliterator<? extends R> flatMapSpliterator = flatMapStream.spliterator();
private FlatMapSpliterator(Spliterator<? extends T> spliterator) {
this.spliterator = spliterator;
}
@Override
public boolean tryAdvance(Consumer<? super R> action) {
while(true) {
if (flatMapSpliterator.tryAdvance(action)) {
return true;
}
if (!spliterator.tryAdvance(item::set)) {
return false; // nothing more to process
}
Stream<? extends R> stream = mapper.apply(item.get());
if(stream != null) {
flatMapStream.close();
flatMapStream = stream;
flatMapStreamRef.set(stream);
flatMapSpliterator = flatMapStream.spliterator();
}
}
}
@Override
@SuppressWarnings("unchecked")
public Spliterator<R> trySplit() {
Spliterator<? extends R> subFlatMapSpliterator = flatMapSpliterator.trySplit();
if(subFlatMapSpliterator != null) {
return (Spliterator<R>) subFlatMapSpliterator;
}
Spliterator<? extends T> subSpliterator = spliterator.trySplit();
if(subSpliterator == null) {
return null;
}
return new FlatMapSpliterator(subSpliterator);
}
@Override
public long estimateSize() {
// If both estimate size are Long.MAX_VALUE then math overflow will happen
long estimateSize = spliterator.estimateSize() + flatMapSpliterator.estimateSize();
return estimateSize < 0 ? Long.MAX_VALUE : estimateSize;
}
@Override
public int characteristics() {
// Maintain only ORDERED (used by native flatMap)
return spliterator.characteristics() & ORDERED;
}
}
return StreamSupport.stream(new FlatMapSpliterator(stream.spliterator()), stream.isParallel())
.onClose(stream::close)
.onClose(flatMapStreamRef.get()::close);
}