Как остановить потоки?

Мне было интересно, когда я создал свой собственный бесконечный поток с Stream.generate как останавливаются потоки, находящиеся в стандартной библиотеке...

Например, когда у вас есть список с записями:

List<Record> records = getListWithRecords();
records.stream().forEach(/* do something */);

Поток не будет бесконечным и будет работать вечно, но он остановится, когда пройдут все элементы в списке. Но как это работает? Та же функциональность применяется для потока, созданного Files.lines(path) (источник: http://www.mkyong.com/java8/java-8-stream-read-a-file-line-by-line/).

И второй вопрос, как можно создать поток с Stream.generate быть остановлен таким же образом, то?

2 ответа

Решение

Конечные потоки просто не создаются через Stream.generate,

Стандартный способ реализации потока, это реализовать Spliterator иногда используя Iterator объезд В любом случае реализация имеет способ сообщить о конце, например, когда Spliterator.tryAdvance возвращается false или его forEachRemaining метод просто возвращает, или в случае Iterator источник, когда hasNext() возвращается false,

Spliterator может даже сообщить ожидаемое количество элементов до начала обработки.

Потоки, созданные одним из заводских методов внутри Stream интерфейс, как Stream.generate может быть реализован либо Spliterator или используя внутренние возможности реализации потока, но независимо от того, как они реализованы, вы не получите руки к этой реализации, чтобы изменить их поведение, поэтому единственный способ сделать такой поток конечным, это связать limit операция в потоке.

Если вы хотите создать непустой конечный поток, который не поддерживается массивом или коллекцией, и ни один из существующих источников потока не подходит, вы должны реализовать свой собственный Spliterator и создать поток из этого. Как сказано выше, вы можете использовать существующий метод для создания Spliterator из Iterator, но вы должны противостоять искушению использовать Iterator только потому что это знакомо Spliterator не сложно реализовать:

/** like {@code Stream.generate}, but with an intrinsic limit */
static <T> Stream<T> generate(Supplier<T> s, long count) {
    return StreamSupport.stream(
               new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
        long remaining=count;

        public boolean tryAdvance(Consumer<? super T> action) {
            if(remaining<=0) return false;
            remaining--;
            action.accept(s.get());
            return true;
        }
    }, false);
}

С этой отправной точки вы можете добавить переопределения для default методы Spliterator интерфейс, взвешивание затрат на разработку и потенциальные улучшения производительности, например

static <T> Stream<T> generate(Supplier<T> s, long count) {
    return StreamSupport.stream(
               new Spliterators.AbstractSpliterator<T>(count, Spliterator.SIZED) {
        long remaining=count;

        public boolean tryAdvance(Consumer<? super T> action) {
            if(remaining<=0) return false;
            remaining--;
            action.accept(s.get());
            return true;
        }

        /** May improve the performance of most non-short-circuiting operations */
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            long toGo=remaining;
            remaining=0;
            for(; toGo>0; toGo--) action.accept(s.get());
        }
    }, false);
}

Я создал общий обходной путь для этого

public class GuardedSpliterator<T> implements Spliterator<T> {

  final Supplier<? extends T> generator;

  final Predicate<T> termination;

  final boolean inclusive;

  public GuardedSpliterator(Supplier<? extends T> generator, Predicate<T> termination, boolean inclusive) {
    this.generator = generator;
    this.termination = termination;
    this.inclusive = inclusive;
  }

  @Override
  public boolean tryAdvance(Consumer<? super T> action) {
    T next = generator.get(); 
    boolean end = termination.test(next);
    if (inclusive || !end) {
      action.accept(next);
    }
    return !end;
  }

  @Override
  public Spliterator<T> trySplit() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public long estimateSize() {
    throw new UnsupportedOperationException("Not supported yet.");
  }

  @Override
  public int characteristics() {
    return Spliterator.ORDERED;
  }

}

Использование довольно просто:

GuardedSpliterator<Integer> source = new GuardedSpliterator<>(
    ()  -> rnd.nextInt(),
    (i) -> i > 10,
    true
);

Stream<Integer> ints = StreamSupport.stream(source, false);

ints.forEach(i -> System.out.println(i));    
Другие вопросы по тегам