Ограничить поток предикатом
Существует ли потоковая операция Java 8, ограничивающая (потенциально бесконечная) Stream
пока первый элемент не соответствует предикату?
В Java 9 мы можем использовать takeWhile
как в примере ниже, чтобы напечатать все числа меньше 10.
IntStream
.iterate(1, n -> n + 1)
.takeWhile(n -> n < 10)
.forEach(System.out::println);
Поскольку в Java 8 такой операции нет, каков наилучший способ ее реализации в общем виде?
18 ответов
Такая операция должна быть возможна с Java 8 Stream
, но это не обязательно может быть сделано эффективно - например, вы не можете обязательно распараллелить такую операцию, поскольку вы должны смотреть на элементы по порядку.
API не предоставляет простой способ сделать это, но что, вероятно, самый простой способ, это взять Stream.iterator()
оберните Iterator
иметь реализацию "на время", а затем вернуться к Spliterator
а затем Stream
, Или - может быть - обернуть Spliterator
хотя в этой реализации его уже нельзя разделить.
Вот непроверенная реализация takeWhile
на Spliterator
:
static <T> Spliterator<T> takeWhile(
Spliterator<T> splitr, Predicate<? super T> predicate) {
return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
boolean stillGoing = true;
@Override public boolean tryAdvance(Consumer<? super T> consumer) {
if (stillGoing) {
boolean hadNext = splitr.tryAdvance(elem -> {
if (predicate.test(elem)) {
consumer.accept(elem);
} else {
stillGoing = false;
}
});
return hadNext && stillGoing;
}
return false;
}
};
}
static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}
Операции takeWhile
а также dropWhile
были добавлены в JDK 9. Ваш пример кода
IntStream
.iterate(1, n -> n + 1)
.takeWhile(n -> n < 10)
.forEach(System.out::println);
будет вести себя точно так, как вы ожидаете, при компиляции и запуске под JDK 9.
JDK 9 был выпущен. Он доступен для скачивания здесь: http://jdk.java.net/9/
allMatch()
является функцией короткого замыкания, так что вы можете использовать ее, чтобы остановить обработку. Основным недостатком является то, что вы должны сделать свой тест дважды: один раз, чтобы увидеть, нужно ли его обрабатывать, и еще раз, чтобы увидеть, стоит ли продолжать.
IntStream
.iterate(1, n -> n + 1)
.peek(n->{if (n<10) System.out.println(n);})
.allMatch(n->n < 10);
В качестве продолжения ответа @StuartMarks. Моя библиотека StreamEx имеет takeWhile
операция, которая совместима с текущей реализацией JDK-9. При работе под JDK-9 он просто делегирует реализацию JDK (через MethodHandle.invokeExact
что действительно быстро). При работе под JDK-8 будет использоваться реализация "polyfill". Таким образом, используя мою библиотеку, проблема может быть решена так:
IntStreamEx.iterate(1, n -> n + 1)
.takeWhile(n -> n < 10)
.forEach(System.out::println);
takeWhile
является одной из функций, предоставляемых библиотекой protonpack.
Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);
assertThat(finiteInts.collect(Collectors.toList()),
hasSize(10));
Обновление: Java 9 Stream
теперь идет с методом takeWhile.
Нет необходимости для взлома или других решений. Просто используйте это!
Я уверен, что это может быть значительно улучшено: (кто-то может сделать это потокобезопасным, может быть)
Stream<Integer> stream = Stream.iterate(0, n -> n + 1);
TakeWhile.stream(stream, n -> n < 10000)
.forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));
Взломать наверняка... Не элегантно - но это работает ~:D
class TakeWhile<T> implements Iterator<T> {
private final Iterator<T> iterator;
private final Predicate<T> predicate;
private volatile T next;
private volatile boolean keepGoing = true;
public TakeWhile(Stream<T> s, Predicate<T> p) {
this.iterator = s.iterator();
this.predicate = p;
}
@Override
public boolean hasNext() {
if (!keepGoing) {
return false;
}
if (next != null) {
return true;
}
if (iterator.hasNext()) {
next = iterator.next();
keepGoing = predicate.test(next);
if (!keepGoing) {
next = null;
}
}
return next != null;
}
@Override
public T next() {
if (next == null) {
if (!hasNext()) {
throw new NoSuchElementException("Sorry. Nothing for you.");
}
}
T temp = next;
next = null;
return temp;
}
public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
TakeWhile tw = new TakeWhile(s, p);
Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
return StreamSupport.stream(split, false);
}
}
Вы можете использовать java8 + rxjava.
import java.util.stream.IntStream;
import rx.Observable;
// Example 1)
IntStream intStream = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
.takeWhile(n ->
{
System.out.println(n);
return n < 10;
}
).subscribe() ;
// Example 2
IntStream intStream = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
.takeWhile(n -> n < 10)
.forEach( n -> System.out.println(n));
На самом деле есть два способа сделать это в Java 8 без каких-либо дополнительных библиотек или с помощью Java 9.
Если вы хотите напечатать числа от 2 до 20 на консоли, вы можете сделать это:
IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);
или же
IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);
Выход в обоих случаях:
2
4
6
8
10
12
14
16
18
20
Никто не упомянул что- либо еще. Это причина этого поста.
Это источник, скопированный из JDK 9 java.util.stream.Stream.takeWhile(Predicate). Небольшая разница для работы с JDK 8.
static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
private static final int CANCEL_CHECK_COUNT = 63;
private final Spliterator<T> s;
private int count;
private T t;
private final AtomicBoolean cancel = new AtomicBoolean();
private boolean takeOrDrop = true;
Taking(Spliterator<T> s) {
super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
this.s = s;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean test = true;
if (takeOrDrop && // If can take
(count != 0 || !cancel.get()) && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
} else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Comparator<? super T> getComparator() {
return s.getComparator();
}
@Override
public void accept(T t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}
return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}
Вот версия, сделанная для ints - как задано в вопросе.
Использование:
StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);
Вот код для StreamUtil:
import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
public class StreamUtil
{
public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
{
return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
}
private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
{
private final PrimitiveIterator.OfInt iterator;
private final IntPredicate predicate;
public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
{
super(Long.MAX_VALUE, IMMUTABLE);
this.iterator = stream.iterator();
this.predicate = predicate;
}
@Override
public boolean tryAdvance(IntConsumer action)
{
if (iterator.hasNext()) {
int value = iterator.nextInt();
if (predicate.test(value)) {
action.accept(value);
return true;
}
}
return false;
}
}
}
Пойдите, чтобы получить библиотеку AbacusUtil. Он предоставляет точный API, который вы хотите, и многое другое:
IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);
Декларация: я разработчик AbacusUtil.
Если вы знаете точное количество повторений, которые будут выполнены, вы можете сделать
IntStream
.iterate(1, n -> n + 1)
.limit(10)
.forEach(System.out::println);
IntStream.iterate(1, n -> n + 1)
.peek(System.out::println) //it will be executed 9 times
.filter(n->n>=9)
.findAny();
вместо пика вы можете использовать mapToObj для возврата конечного объекта или сообщения
IntStream.iterate(1, n -> n + 1)
.mapToObj(n->{ //it will be executed 9 times
if(n<9)
return "";
return "Loop repeats " + n + " times";});
.filter(message->!message.isEmpty())
.findAny()
.ifPresent(System.out::println);
Даже у меня было похожее требование - запусти веб-сервис, если не получится, повторите его 3 раза. Если это не удается даже после этих многочисленных испытаний, отправьте уведомление по электронной почте. После того, как много гуглил, anyMatch()
пришел как спаситель. Мой пример кода выглядит следующим образом. В следующем примере, если метод webServiceCall возвращает true в самой первой итерации, поток не выполняет дальнейшую итерацию, как мы вызывали anyMatch()
, Я считаю, это то, что вы ищете.
import java.util.stream.IntStream;
import io.netty.util.internal.ThreadLocalRandom;
class TrialStreamMatch {
public static void main(String[] args) {
if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
//Code for sending email notifications
}
}
public static boolean webServiceCall(int i){
//For time being, I have written a code for generating boolean randomly
//This whole piece needs to be replaced by actual web-service client code
boolean bool = ThreadLocalRandom.current().nextBoolean();
System.out.println("Iteration index :: "+i+" bool :: "+bool);
//Return success status -- true or false
return bool;
}
Вы не можете прервать поток, за исключением короткого замыкания в терминальной операции, в результате чего некоторые значения потока остаются необработанными независимо от их значения. Но если вы просто хотите избежать операций с потоком, вы можете добавить преобразование и фильтр к потоку:
import java.util.Objects;
class ThingProcessor
{
static Thing returnNullOnCondition(Thing thing)
{ return( (*** is condition met ***)? null : thing); }
void processThings(Collection<Thing> thingsCollection)
{
thingsCollection.stream()
*** regular stream processing ***
.map(ThingProcessor::returnNullOnCondition)
.filter(Objects::nonNull)
*** continue stream processing ***
}
} // class ThingProcessor
Это преобразует поток вещей в нули, когда вещи удовлетворяют некоторому условию, а затем отфильтровывает нули. Если вы хотите побаловать себя побочными эффектами, вы можете установить значение условия в true, как только что-то встретится, поэтому все последующие вещи будут отфильтрованы независимо от их значения. Но даже если нет, вы можете сохранить большую часть (если не совсем всю) обработку, отфильтровывая значения из потока, который вы не хотите обрабатывать.
Если у вас другая проблема, может потребоваться другое решение, но для вашей текущей проблемы я бы просто сказал:
IntStream
.iterate(1, n -> n + 1)
.limit(10)
.forEach(System.out::println);
У меня есть другое быстрое решение путем реализации этого (который на самом деле нечист, но вы поняли):
public static void main(String[] args) {
System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
.map(o -> o.toString()).collect(Collectors.joining(", ")));
}
static interface TerminatedStream<T> {
Stream<T> terminateOn(T e);
}
static class StreamUtil {
static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
return new TerminatedStream<T>() {
public Stream<T> terminateOn(T e) {
Builder<T> builder = Stream.<T> builder().add(seed);
T current = seed;
while (!current.equals(e)) {
current = op.apply(current);
builder.add(current);
}
return builder.build();
}
};
}
}
Может быть, немного не по теме, но это то, что мы имеем для List<T>
скорее, чем Stream<T>
,
Сначала вам нужно иметь take
Утилита метод. Этот метод занимает первое n
элементы:
static <T> List<T> take(List<T> l, int n) {
if (n <= 0) {
return newArrayList();
} else {
int takeTo = Math.min(Math.max(n, 0), l.size());
return l.subList(0, takeTo);
}
}
это просто работает как scala.List.take
assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));
assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));
теперь будет довольно просто написать takeWhile
метод, основанный на take
static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
return l.stream().
filter(p.negate()).findFirst(). // find first element when p is false
map(l::indexOf). // find the index of that element
map(i -> take(l, i)). // take up to the index
orElse(l); // return full list if p is true for all elements
}
это работает так:
assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));
эта реализация частично повторяет список несколько раз, но не добавляет O(n^2)
операции. Надеюсь, что это приемлемо.
Вот моя попытка использовать только библиотеку Java Stream.
IntStream.iterate(0, i -> i + 1)
.filter(n -> {
if (n < 10) {
System.out.println(n);
return false;
} else {
return true;
}
})
.findAny();