Как остановить потоки?
Мне было интересно, когда я создал свой собственный бесконечный поток с Stream.generate
как останавливаются потоки, находящиеся в стандартной библиотеке...
Например, когда у вас есть список с записями:
List<Record> records = getListWithRecords();
records.stream().forEach(/* do something */);
Поток не будет бесконечным и будет работать вечно, но он остановится, когда пройдут все элементы в списке. Но как это работает? Та же функциональность применяется для потока, созданного Files.lines(path)
(источник: http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/).
И второй вопрос, как можно создать поток с Stream.generate
быть остановлен таким же образом, то?
2 ответа
Конечные потоки просто не создаются через Stream.generate
,
Стандартный способ реализации потока, это реализовать Spliterator
иногда используя Iterator
объезд В любом случае реализация имеет способ сообщить о конце, например, когда Spliterator.tryAdvance
возвращается false
или его forEachRemaining
метод просто возвращает, или в случае Iterator
источник, когда hasNext()
возвращается false
,
Spliterator
может даже сообщить ожидаемое количество элементов до начала обработки.
Потоки, созданные одним из заводских методов внутри Stream
интерфейс, как Stream.generate
может быть реализован либо Spliterator
или используя внутренние возможности реализации потока, но независимо от того, как они реализованы, вы не получите руки к этой реализации, чтобы изменить их поведение, поэтому единственный способ сделать такой поток конечным, это связать limit
операция в потоке.
Если вы хотите создать непустой конечный поток, который не поддерживается массивом или коллекцией, и ни один из существующих источников потока не подходит, вы должны реализовать свой собственный Spliterator
и создать поток из этого. Как сказано выше, вы можете использовать существующий метод для создания Spliterator
из Iterator
, но вы должны противостоять искушению использовать Iterator
только потому что это знакомо Spliterator
не сложно реализовать:
/** like {@code Stream.generate}, but with an intrinsic limit */
static <T> Stream<T> generate(Supplier<T> s, long count) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
long remaining=count;
public boolean tryAdvance(Consumer<? super T> action) {
if(remaining<=0) return false;
remaining--;
action.accept(s.get());
return true;
}
}, false);
}
С этой отправной точки вы можете добавить переопределения для default
методы Spliterator
интерфейс, взвешивание затрат на разработку и потенциальные улучшения производительности, например
static <T> Stream<T> generate(Supplier<T> s, long count) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
long remaining=count;
public boolean tryAdvance(Consumer<? super T> action) {
if(remaining<=0) return false;
remaining--;
action.accept(s.get());
return true;
}
/** May improve the performance of most non-short-circuiting operations */
@Override
public void forEachRemaining(Consumer<? super T> action) {
long toGo=remaining;
remaining=0;
for(; toGo>0; toGo--) action.accept(s.get());
}
}, false);
}
Я создал общий обходной путь для этого
public class GuardedSpliterator<T> implements Spliterator<T> {
final Supplier<? extends T> generator;
final Predicate<T> termination;
final boolean inclusive;
public GuardedSpliterator(Supplier<? extends T> generator, Predicate<T> termination, boolean inclusive) {
this.generator = generator;
this.termination = termination;
this.inclusive = inclusive;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T next = generator.get();
boolean end = termination.test(next);
if (inclusive || !end) {
action.accept(next);
}
return !end;
}
@Override
public Spliterator<T> trySplit() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long estimateSize() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int characteristics() {
return Spliterator.ORDERED;
}
}
Использование довольно просто:
GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
() -> rnd.nextInt(),
(i) -> i > 10,
true
);
Stream<Integer> ints = StreamSupport.stream(source, false);
ints.forEach(i -> System.out.println(i));