Ограничить поток предикатом

Существует ли потоковая операция Java 8, ограничивающая (потенциально бесконечная) Stream пока первый элемент не соответствует предикату?

В Java 9 мы можем использовать takeWhile как в примере ниже, чтобы напечатать все числа меньше 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

Поскольку в Java 8 такой операции нет, каков наилучший способ ее реализации в общем виде?

18 ответов

Решение

Такая операция должна быть возможна с Java 8 Stream, но это не обязательно может быть сделано эффективно - например, вы не можете обязательно распараллелить такую ​​операцию, поскольку вы должны смотреть на элементы по порядку.

API не предоставляет простой способ сделать это, но что, вероятно, самый простой способ, это взять Stream.iterator()оберните Iterator иметь реализацию "на время", а затем вернуться к Spliterator а затем Stream, Или - может быть - обернуть Spliteratorхотя в этой реализации его уже нельзя разделить.

Вот непроверенная реализация takeWhile на Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

Операции takeWhile а также dropWhile были добавлены в JDK 9. Ваш пример кода

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

будет вести себя точно так, как вы ожидаете, при компиляции и запуске под JDK 9.

JDK 9 был выпущен. Он доступен для скачивания здесь: http://jdk.java.net/9/

allMatch() является функцией короткого замыкания, так что вы можете использовать ее, чтобы остановить обработку. Основным недостатком является то, что вы должны сделать свой тест дважды: один раз, чтобы увидеть, нужно ли его обрабатывать, и еще раз, чтобы увидеть, стоит ли продолжать.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);

В качестве продолжения ответа @StuartMarks. Моя библиотека StreamEx имеет takeWhile операция, которая совместима с текущей реализацией JDK-9. При работе под JDK-9 он просто делегирует реализацию JDK (через MethodHandle.invokeExact что действительно быстро). При работе под JDK-8 будет использоваться реализация "polyfill". Таким образом, используя мою библиотеку, проблема может быть решена так:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);

takeWhile является одной из функций, предоставляемых библиотекой protonpack.

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));

Обновление: Java 9 Stream теперь идет с методом takeWhile.

Нет необходимости для взлома или других решений. Просто используйте это!


Я уверен, что это может быть значительно улучшено: (кто-то может сделать это потокобезопасным, может быть)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

Взломать наверняка... Не элегантно - но это работает ~:D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}

Вы можете использовать java8 + rxjava.

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));

На самом деле есть два способа сделать это в Java 8 без каких-либо дополнительных библиотек или с помощью Java 9.

Если вы хотите напечатать числа от 2 до 20 на консоли, вы можете сделать это:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

или же

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

Выход в обоих случаях:

2
4
6
8
10
12
14
16
18
20

Никто не упомянул что- либо еще. Это причина этого поста.

Это источник, скопированный из JDK 9 java.util.stream.Stream.takeWhile(Predicate). Небольшая разница для работы с JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

Вот версия, сделанная для ints - как задано в вопросе.

Использование:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Вот код для StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    {
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    }

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    {
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        {
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        }

        @Override
        public boolean tryAdvance(IntConsumer action)
        {
            if (iterator.hasNext()) {
                int value = iterator.nextInt();
                if (predicate.test(value)) {
                    action.accept(value);
                    return true;
                }
            }

            return false;
        }
    }
}

Пойдите, чтобы получить библиотеку AbacusUtil. Он предоставляет точный API, который вы хотите, и многое другое:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Декларация: я разработчик AbacusUtil.

Если вы знаете точное количество повторений, которые будут выполнены, вы можете сделать

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);
    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

вместо пика вы можете использовать mapToObj для возврата конечного объекта или сообщения

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);

Даже у меня было похожее требование - запусти веб-сервис, если не получится, повторите его 3 раза. Если это не удается даже после этих многочисленных испытаний, отправьте уведомление по электронной почте. После того, как много гуглил, anyMatch() пришел как спаситель. Мой пример кода выглядит следующим образом. В следующем примере, если метод webServiceCall возвращает true в самой первой итерации, поток не выполняет дальнейшую итерацию, как мы вызывали anyMatch(), Я считаю, это то, что вы ищете.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}

Вы не можете прервать поток, за исключением короткого замыкания в терминальной операции, в результате чего некоторые значения потока остаются необработанными независимо от их значения. Но если вы просто хотите избежать операций с потоком, вы можете добавить преобразование и фильтр к потоку:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }

    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

Это преобразует поток вещей в нули, когда вещи удовлетворяют некоторому условию, а затем отфильтровывает нули. Если вы хотите побаловать себя побочными эффектами, вы можете установить значение условия в true, как только что-то встретится, поэтому все последующие вещи будут отфильтрованы независимо от их значения. Но даже если нет, вы можете сохранить большую часть (если не совсем всю) обработку, отфильтровывая значения из потока, который вы не хотите обрабатывать.

Если у вас другая проблема, может потребоваться другое решение, но для вашей текущей проблемы я бы просто сказал:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);

У меня есть другое быстрое решение путем реализации этого (который на самом деле нечист, но вы поняли):

public static void main(String[] args) {
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
    Stream<T> terminateOn(T e);
}

static class StreamUtil {
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
        return new TerminatedStream<T>() {
            public Stream<T> terminateOn(T e) {
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) {
                    current = op.apply(current);
                    builder.add(current);
                }
                return builder.build();
            }
        };
    }
}

Может быть, немного не по теме, но это то, что мы имеем для List<T> скорее, чем Stream<T>,

Сначала вам нужно иметь take Утилита метод. Этот метод занимает первое n элементы:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

это просто работает как scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

теперь будет довольно просто написать takeWhile метод, основанный на take

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

это работает так:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

эта реализация частично повторяет список несколько раз, но не добавляет O(n^2) операции. Надеюсь, что это приемлемо.

Вот моя попытка использовать только библиотеку Java Stream.

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> {
                if (n < 10) {
                    System.out.println(n);
                    return false;
                } else {
                    return true;
                }
            })
        .findAny();
Другие вопросы по тегам