Можете ли вы разделить поток на два потока?
У меня есть набор данных, представленный потоком Java 8:
Stream<T> stream = ...;
Я вижу, как отфильтровать его, чтобы получить случайное подмножество - например,
Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));
Я также вижу, как можно уменьшить этот поток, чтобы получить, например, два списка, представляющих две случайные половины набора данных, а затем превратить их обратно в потоки. Но есть ли прямой способ генерировать два потока из исходного? Что-то вроде
(heads, tails) = stream.[some kind of split based on filter]
Спасибо за понимание.
11 ответов
Не совсем. Вы не можете получить два Stream
из одного; это не имеет смысла - как бы вы перебрали одно без необходимости генерировать другое одновременно? Поток может быть использован только один раз.
Однако, если вы хотите сбросить их в список или что-то, вы можете сделать
stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
Коллектор может быть использован для этого.
- Для двух категорий используйте
Collectors.partitioningBy()
завод.
Это создаст Map
от Boolean
в List
и положить элементы в один или другой список на основе Predicate
,
Примечание. Поскольку поток должен потребляться целиком, он не может работать с бесконечными потоками. Поскольку поток все равно используется, этот метод просто помещает их в списки вместо создания нового потока с памятью.
Кроме того, нет необходимости в итераторе, даже в приведенном вами примере только с заголовками.
Random r = new Random();
Map<Boolean, List<String>> groups = stream
.collect(Collectors.partitioningBy(x -> r.nextBoolean()));
System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
- Для большего количества категорий используйте
Collectors.groupingBy()
завод.
Map<Object, List<String>> groups = stream
.collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());
Если потоки не Stream
, но один из примитивных потоков вроде IntStream
тогда это .collect(Collectors)
метод недоступен. Вам придется делать это вручную, без коллекторской фабрики. Это реализация выглядит так:
IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000);
Predicate<Integer> p = x -> r.nextBoolean();
Map<Boolean, List<Integer>> groups = intStream.collect(() -> {
Map<Boolean, List<Integer>> map = new HashMap<>();
map.put(false, new ArrayList<>());
map.put(true, new ArrayList<>());
return map;
}, (map, x) -> {
boolean partition = p.test(x);
List<Integer> list = map.get(partition);
list.add(x);
}, (map1, map2) -> {
map1.get(false).addAll(map2.get(false));
map1.get(true).addAll(map2.get(true));
});
System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
редактировать
Как уже указывалось, вышеупомянутый "обходной путь" не является потокобезопасным. Преобразование в нормальный Stream
перед коллекционированием это путь:
Stream<Integer> stream = intStream.boxed();
Я наткнулся на этот вопрос для себя и чувствую, что у разветвленного потока есть несколько вариантов использования, которые могут оказаться действительными. Я написал приведенный ниже код как потребитель, так что он ничего не делает, но вы можете применить его к функциям и ко всему, с чем вы можете столкнуться.
class PredicateSplitterConsumer<T> implements Consumer<T>
{
private Predicate<T> predicate;
private Consumer<T> positiveConsumer;
private Consumer<T> negativeConsumer;
public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
{
this.predicate = predicate;
this.positiveConsumer = positive;
this.negativeConsumer = negative;
}
@Override
public void accept(T t)
{
if (predicate.test(t))
{
positiveConsumer.accept(t);
}
else
{
negativeConsumer.accept(t);
}
}
}
Теперь ваша реализация кода может выглядеть примерно так:
personsArray.forEach(
new PredicateSplitterConsumer<>(
person -> person.getDateOfBirth().isPresent(),
person -> System.out.println(person.getName()),
person -> System.out.println(person.getName() + " does not have Date of birth")));
К сожалению, то, что вы просите, прямо не одобряется в JavaDoc Stream:
Поток должен использоваться (вызывая промежуточную или терминальную операцию потока) только один раз. Это исключает, например, "разветвленные" потоки, где один и тот же источник передает два или более конвейеров, или несколько обходов одного и того же потока.
Вы можете обойти это, используя peek
или другие методы, если вы действительно желаете такого поведения. В этом случае вместо попыток создания резервной копии двух потоков из одного исходного источника потока с помощью разветвляющегося фильтра вы должны продублировать свой поток и отфильтровать каждый из дубликатов соответствующим образом.
Тем не менее, вы можете пересмотреть, если Stream
является подходящей структурой для вашего варианта использования.
Вы можете получить два
Stream
s из единицы,
начиная с Java 12 с
teeing
подсчет орлов и решек за 100 подбрасываний монет
Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
List<Long> list = Stream.iterate(0, i -> coin.nextInt())
.limit(100).collect(teeing(
filtering(i -> i == 1, counting()),
filtering(i -> i == 0, counting()),
(heads, tails) -> {
return(List.of(heads, tails));
}));
System.err.println("heads:" + list.get(0) + " tails:" + list.get(1));
получает, например:
heads:51 tails:49
Это против общего механизма Stream. Скажем, вы можете разделить Stream S0 на Sa и Sb, как вы хотели. Выполняя любую терминальную операцию, скажем count()
, на Sa будет обязательно "потреблять" все элементы в S0. Поэтому Sb потерял свой источник данных.
Ранее Stream был tee()
метод, я думаю, который дублирует поток до двух. Это удалено сейчас.
В Stream есть метод peek(), но вы можете использовать его для достижения своих требований.
Не совсем, но вы можете выполнить то, что вам нужно, вызвав Collectors.groupingBy()
, вы создаете новую коллекцию, а затем можете создавать экземпляры потоков в этой новой коллекции.
Это был наименее плохой ответ, который я мог придумать.
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
public class Test {
public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {
Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());
return new ImmutablePair<L, R>(trueResult, falseResult);
}
public static void main(String[] args) {
Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);
Pair<List<Integer>, String> results = splitStream(stream,
n -> n > 5,
s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));
System.out.println(results);
}
}
Это берет поток целых чисел и разбивает их на 5. Для тех, кто больше 5, он фильтрует только четные числа и помещает их в список. В остальном он соединяет их с |.
выходы:
([6, 8],0|1|2|3|4|5)
Он не идеален, поскольку собирает все в промежуточные коллекции, разрушая поток (и имеет слишком много аргументов!)
Я наткнулся на этот вопрос, ища способ отфильтровать определенные элементы из потока и зарегистрировать их как ошибки. Поэтому мне не нужно было так сильно разделять поток, как прикреплять преждевременное завершающее действие к предикату с ненавязчивым синтаксисом. Вот что я придумал:
public class MyProcess {
/* Return a Predicate that performs a bail-out action on non-matching items. */
private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
return x -> {
if (pred.test(x)) {
return true;
}
altAction.accept(x);
return false;
};
/* Example usage in non-trivial pipeline */
public void processItems(Stream<Item> stream) {
stream.filter(Objects::nonNull)
.peek(this::logItem)
.map(Item::getSubItems)
.filter(withAltAction(SubItem::isValid,
i -> logError(i, "Invalid")))
.peek(this::logSubItem)
.filter(withAltAction(i -> i.size() > 10,
i -> logError(i, "Too large")))
.map(SubItem::toDisplayItem)
.forEach(this::display);
}
}
Укороченная версия, использующая Ломбок
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.RequiredArgsConstructor;
/**
* Forks a Stream using a Predicate into postive and negative outcomes.
*/
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
Predicate<T> predicate;
Consumer<T> positiveConsumer;
Consumer<T> negativeConsumer;
@Override
public void accept(T t) {
(predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
}
}
Как насчет:
Supplier<Stream<Integer>> randomIntsStreamSupplier =
() -> (new Random()).ints(0, 2).boxed();
Stream<Integer> tails =
randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
randomIntsStreamSupplier.get().filter(x->x.equals(1));