Соберите последовательные пары из потока

Учитывая поток, такой как { 0, 1, 2, 3, 4 },

как я могу наиболее элегантно преобразовать его в заданную форму:

{ new Pair(0, 1), new Pair(1, 2), new Pair(2, 3), new Pair(3, 4) }

(при условии, конечно, я определил класс Pair)?

Изменить: Это не строго о целых или примитивных потоков. Ответ должен быть общим для потока любого типа.

22 ответа

Решение

Моя библиотека StreamEx, которая расширяет стандартные потоки, обеспечивает pairMap метод для всех типов потоков. Для примитивных потоков это не меняет тип потока, но может использоваться для некоторых вычислений. Чаще всего используется для расчета различий:

int[] pairwiseDiffs = IntStreamEx.of(input).pairMap((a, b) -> (b-a)).toArray();

Для потока объекта вы можете создать любой другой тип объекта. Моя библиотека не предоставляет никаких новых видимых пользователем структур данных, таких как Pair (это часть концепции библиотеки). Однако, если у вас есть свой Pair Класс и хотите использовать его, вы можете сделать следующее:

Stream<Pair> pairs = IntStreamEx.of(input).boxed().pairMap(Pair::new);

Или если у вас уже есть некоторые Stream:

Stream<Pair> pairs = StreamEx.of(stream).pairMap(Pair::new);

Эта функциональность реализована с использованием пользовательского сплитератора. Он имеет довольно низкие накладные расходы и может хорошо распараллеливать. Конечно, он работает с любым источником потока, а не только со списком / массивом произвольного доступа, как и многие другие решения. Во многих тестах он работает действительно хорошо. Вот эталонный тест JMH, где мы находим все входные значения, предшествующие большему значению, используя различные подходы (см. Этот вопрос).

Библиотека потоков Java 8 в первую очередь предназначена для разделения потоков на более мелкие порции для параллельной обработки, поэтому этапы конвейера с сохранением состояния весьма ограничены, и такие вещи, как получение индекса текущего элемента потока и доступ к смежным элементам потока, не поддерживаются.

Типичный способ решения этих проблем, с некоторыми ограничениями, конечно, состоит в том, чтобы управлять потоком по индексам и полагаться на обработку значений в некоторой структуре данных с произвольным доступом, такой как ArrayList, из которой можно извлечь элементы. Если значения были в arrayListможно создать пары по запросу, выполнив что-то вроде этого:

    IntStream.range(1, arrayList.size())
             .mapToObj(i -> new Pair(arrayList.get(i-1), arrayList.get(i)))
             .forEach(System.out::println);

Конечно, ограничение заключается в том, что входные данные не могут быть бесконечным потоком. Этот трубопровод может быть запущен параллельно.

Вы можете сделать это с помощью метода Stream.reduce() (я не видел других ответов, использующих эту технику).

public static <T> List<Pair<T, T>> consecutive(List<T> list) {
    List<Pair<T, T>> pairs = new LinkedList<>();
    list.stream().reduce((a, b) -> {
        pairs.add(new Pair<>(a, b));
        return b;
    });
    return pairs;
}

Это не элегантно, это хакерское решение, но работает для бесконечных потоков

Stream<Pair> pairStream = Stream.iterate(0, (i) -> i + 1).map( // natural numbers
    new Function<Integer, Pair>() {
        Integer previous;

        @Override
        public Pair apply(Integer integer) {
            Pair pair = null;
            if (previous != null) pair = new Pair(previous, integer);
            previous = integer;
            return pair;
        }
    }).skip(1); // drop first null

Теперь вы можете ограничить свой поток до желаемой длины

pairStream.limit(1_000_000).forEach(i -> System.out.println(i));

PS Я надеюсь, что есть лучшее решение, что-то вроде clojure (partition 2 1 stream)

Я реализовал упаковщик разделителя, который принимает каждый n элементы T от оригинального сплитератора и производит List<T>:

public class ConsecutiveSpliterator<T> implements Spliterator<List<T>> {

    private final Spliterator<T> wrappedSpliterator;

    private final int n;

    private final Deque<T> deque;

    private final Consumer<T> dequeConsumer;

    public ConsecutiveSpliterator(Spliterator<T> wrappedSpliterator, int n) {
        this.wrappedSpliterator = wrappedSpliterator;
        this.n = n;
        this.deque = new ArrayDeque<>();
        this.dequeConsumer = deque::addLast;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        deque.pollFirst();
        fillDeque();
        if (deque.size() == n) {
            List<T> list = new ArrayList<>(deque);
            action.accept(list);
            return true;
        } else {
            return false;
        }
    }

    private void fillDeque() {
        while (deque.size() < n && wrappedSpliterator.tryAdvance(dequeConsumer))
            ;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return wrappedSpliterator.estimateSize();
    }

    @Override
    public int characteristics() {
        return wrappedSpliterator.characteristics();
    }
}

Следующий метод может быть использован для создания последовательного потока:

public <E> Stream<List<E>> consecutiveStream(Stream<E> stream, int n) {
    Spliterator<E> spliterator = stream.spliterator();
    Spliterator<List<E>> wrapper = new ConsecutiveSpliterator<>(spliterator, n);
    return StreamSupport.stream(wrapper, false);
}

Пример использования:

consecutiveStream(Stream.of(0, 1, 2, 3, 4, 5), 2)
    .map(list -> new Pair(list.get(0), list.get(1)))
    .forEach(System.out::println);

Вы можете сделать это в циклоп-реакции (я помогаю этой библиотеке), используя оператор скольжения.

  LazyFutureStream.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

Или же

   ReactiveSeq.of( 0, 1, 2, 3, 4 )
                  .sliding(2)
                  .map(Pair::new);

Предполагая, что конструктор Pair может принимать коллекцию с 2 элементами.

Если вы хотите сгруппировать по 4 и увеличить на 2, это также поддерживается.

     ReactiveSeq.rangeLong( 0L,Long.MAX_VALUE)
                .sliding(4,2)
                .forEach(System.out::println);

Эквивалентные статические методы для создания скользящего представления над java.util.stream.Stream также предоставляются в классе StreamUtils циклопов-потоков.

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

Примечание:- для однопоточной операции ReactiveSeq будет более подходящим. LazyFutureStream расширяет ReactiveSeq, но в основном предназначен для параллельного / параллельного использования (это поток фьючерсов).

LazyFutureStream расширяет ReactiveSeq, который расширяет Seq от удивительного jOOλ (который расширяет java.util.stream.Stream), поэтому решения, представленные Lukas, также будут работать с любым типом Stream. Для всех, кто интересуется, основными различиями между операторами окна / скольжения являются очевидный компромисс между мощностью и сложностью и пригодность для использования с бесконечными потоками (скольжение не потребляет поток, а буферизирует по мере его прохождения).

Streams.zip(..)доступен в Гуаве для тех, кто от него зависит.

Пример:

Streams.zip(list.stream(),
            list.stream().skip(1),
            (a, b) -> System.out.printf("%s %s\n", a, b));

Библиотека Proton-Pack обеспечивает оконную функциональность. Учитывая класс Pair и Stream, вы можете сделать это следующим образом:

Stream<Integer> st = Stream.iterate(0 , x -> x + 1);
Stream<Pair<Integer, Integer>> pairs = StreamUtils.windowed(st, 2, 1)
                                                  .map(l -> new Pair<>(l.get(0), l.get(1)))
                                                  .moreStreamOps(...);

Теперь pairs поток содержит:

(0, 1)
(1, 2)
(2, 3)
(3, 4)
(4, ...) and so on

Поиск последовательных пар

Если вы готовы использовать стороннюю библиотеку и не нуждаетесь в параллелизме, тогда jOOλ предлагает функции окна в стиле SQL следующим образом

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window()
   .filter(w -> w.lead().isPresent())
   .map(w -> tuple(w.value(), w.lead().get())) // alternatively, use your new Pair() class
   .toList()
);

Уступая

[(0, 1), (1, 2), (2, 3), (3, 4)]

lead() Функция обращается к следующему значению в порядке обхода из окна.

Поиск последовательных троек / четверок / n-кортежей

В комментариях был задан вопрос о более общем решении, в котором следует собирать не пары, а n-кортежи (или, возможно, списки). Вот, таким образом, альтернативный подход:

int n = 3;

System.out.println(
Seq.of(0, 1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

Вывод списка списков

[[0, 1, 2], [1, 2, 3], [2, 3, 4]]

Без filter(w -> w.count() == n)результат будет

[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]

Отказ от ответственности: я работаю на компанию за JOOλ

Операция в основном с состоянием, поэтому не совсем то, для чего предназначены потоки - см. Раздел "Поведение без состояния" в javadoc:

Наилучший подход - избегать поведенческих параметров с сохранением состояния для потоковой передачи операций полностью

Одним из решений здесь является введение состояния в ваш поток через внешний счетчик, хотя он будет работать только с последовательным потоком.

public static void main(String[] args) {
    Stream<String> strings = Stream.of("a", "b", "c", "c");
    AtomicReference<String> previous = new AtomicReference<>();
    List<Pair> collect = strings.map(n -> {
                            String p = previous.getAndSet(n);
                            return p == null ? null : new Pair(p, n);
                        })
                        .filter(p -> p != null)
                        .collect(toList());
    System.out.println(collect);
}


static class Pair<T> {
    private T left, right;
    Pair(T left, T right) { this.left = left; this.right = right; }
    @Override public String toString() { return "{" + left + "," + right + '}'; }
}

Мы можем использовать RxJava (очень мощная библиотека реактивных расширений)

IntStream intStream  = IntStream.iterate(1, n -> n + 1);

Observable<List<Integer>> pairObservable = Observable.from(intStream::iterator).buffer(2,1);

pairObservable.take(10).forEach(b -> {
            b.forEach(n -> System.out.println(n));
            System.out.println();
        });

Оператор буфера преобразует Observable, который испускает элементы, в Observable, который испускает буферизованные коллекции этих элементов.

Вы можете использовать Flux:

      Stream<String> someStream = Stream.of("A", "B", "C", "D");
Flux<String> someFlux = Flux.fromStream(someStream);

someFlux.zipWith(someFlux.skip(1))
    .map(t -> t.getT1().concat(t.getT2()))
    .subscribe(System.out::println);

Результат будет:

      AB
BC
CD

Я наконец-то нашел способ обмануть Stream.reduce, чтобы иметь возможность аккуратно работать с парами значений; Есть множество вариантов использования, которые требуют этой возможности, которая не появляется в JDK 8:

public static int ArithGeo(int[] arr) {
    //Geometric
    List<Integer> diffList = new ArrayList<>();
    List<Integer> divList = new ArrayList<>();
    Arrays.stream(arr).reduce((left, right) -> {
        diffList.add(right-left);
        divList.add(right/left);
        return right;
    });
    //Arithmetic
    if(diffList.stream().distinct().count() == 1) {
        return 1;
    }
    //Geometric
    if(divList.stream().distinct().count() == 1) {
        return 2;
    }
    return -1;
}

Уловка, которую я использую, - этоправо на возврат; заявление.

Решения здесь кажутся немного сложными или зависят от сторонних библиотек. Эту проблему можно решить с помощью промежуточного потока, который собирает пары:

      public static <T> Stream<List<T>> pairs(Stream<T> stream) {
    Iterator<T> iterator = stream.iterator();
    if (iterator.hasNext()) {
        T first = iterator.next();
        if (iterator.hasNext()) {
            return Stream.iterate(
                List.of(first, iterator.next()),
                prev -> iterator.hasNext() ? List.of(prev.get(1), iterator.next()) : null)
                .takeWhile(prev -> prev != null);
        }
    }
    return Stream.empty();
}

Примеры:

      pairs(Stream.of()).toList();      // []
pairs(Stream.of(1)).toList();     // []
pairs(Stream.of(1,2)).toList();   // [[1, 2]]
pairs(Stream.of(1,2,3)).toList(); // [[1, 2], [2, 3]]
pairs(Stream.of("a","b","c","d")).toList();  // [[a, b], [b, c], [c, d]]

В этом решенииStream.iterateиспользует аккумулятор почти так же, какreduce, за исключением того, что он создает промежуточный поток, а не терминальную операцию. Так что лень и бесконечные потоки поддерживаются.

В вашем случае я написал бы свою собственную IntFunction, которая отслеживает последние переданные int, и использую ее для сопоставления исходного IntStream.

import java.util.function.IntFunction;
import java.util.stream.IntStream;

public class PairFunction implements IntFunction<PairFunction.Pair> {

  public static class Pair {

    private final int first;
    private final int second;

    public Pair(int first, int second) {
      this.first = first;
      this.second = second;
    }

    @Override
    public String toString() {
      return "[" + first + "|" + second + "]";
    }
  }

  private int last;
  private boolean first = true;

  @Override
  public Pair apply(int value) {
    Pair pair = !first ? new Pair(last, value) : null;
    last = value;
    first = false;
    return pair;
  }

  public static void main(String[] args) {

    IntStream intStream = IntStream.of(0, 1, 2, 3, 4);
    final PairFunction pairFunction = new PairFunction();
    intStream.mapToObj(pairFunction)
        .filter(p -> p != null) // filter out the null
        .forEach(System.out::println); // display each Pair

  }

}

Для расчета последовательных различий во времени (значения х) временного ряда я использую stream"s collect(...) метод:

final List< Long > intervals = timeSeries.data().stream()
                    .map( TimeSeries.Datum::x )
                    .collect( DifferenceCollector::new, DifferenceCollector::accept, DifferenceCollector::combine )
                    .intervals();

Где DifferenceCollector выглядит примерно так:

public class DifferenceCollector implements LongConsumer
{
    private final List< Long > intervals = new ArrayList<>();
    private Long lastTime;

    @Override
    public void accept( final long time )
    {
        if( Objects.isNull( lastTime ) )
        {
            lastTime = time;
        }
        else
        {
            intervals.add( time - lastTime );
            lastTime = time;
        }
    }

    public void combine( final DifferenceCollector other )
    {
        intervals.addAll( other.intervals );
        lastTime = other.lastTime;
    }

    public List< Long > intervals()
    {
        return intervals;
    }
}

Возможно, вы могли бы изменить это в соответствии с вашими потребностями.

Это интересная проблема. Моя гибридная попытка ниже всякой пользы?

public static void main(String[] args) {
    List<Integer> list = Arrays.asList(1, 2, 3);
    Iterator<Integer> first = list.iterator();
    first.next();
    if (first.hasNext())
        list.stream()
        .skip(1)
        .map(v -> new Pair(first.next(), v))
        .forEach(System.out::println);
}

Я считаю, что он не поддается параллельной обработке и, следовательно, может быть дисквалифицирован.

Как отметили другие, в силу характера проблемы требуется определенная государственность.

Я столкнулся с подобной проблемой, в которой я хотел, по сути, функцию Oracle SQL LEAD. Моя попытка реализовать это ниже.

/**
 * Stream that pairs each element in the stream with the next subsequent element.
 * The final pair will have only the first item, the second will be null.
 */
<T> Spliterator<Pair<T>> lead(final Stream<T> stream)
{
    final Iterator<T> input = stream.sequential().iterator();

    final Iterable<Pair<T>> iterable = () ->
    {
        return new Iterator<Pair<T>>()
        {
            Optional<T> current = getOptionalNext(input);

            @Override
            public boolean hasNext()
            {
                return current.isPresent();
            }

            @Override
            public Pair<T> next()
            {
                Optional<T> next = getOptionalNext(input);
                final Pair<T> pair = next.isPresent()
                    ? new Pair(current.get(), next.get())
                    : new Pair(current.get(), null);
                current = next;

                return pair;
            }
        };
    };

    return iterable.spliterator();
}

private <T> Optional<T> getOptionalNext(final Iterator<T> iterator)
{
    return iterator.hasNext()
        ? Optional.of(iterator.next())
        : Optional.empty();
}

Вы можете добиться этого, используя ограниченную очередь для хранения элементов, которые проходят через поток (что основано на идее, которую я подробно описал здесь: возможно ли получить следующий элемент в потоке?)

Приведенный ниже пример сначала определяет экземпляр класса BoundedQueue, который будет хранить элементы, проходящие через поток (если вам не нравится идея расширения LinkedList, обратитесь к упомянутой выше ссылке для альтернативного и более общего подхода). Позже вы просто объедините два последующих элемента в экземпляр Pair:

public class TwoSubsequentElems {
  public static void main(String[] args) {
    List<Integer> input = new ArrayList<Integer>(asList(0, 1, 2, 3, 4));

    class BoundedQueue<T> extends LinkedList<T> {
      public BoundedQueue<T> save(T curElem) {
        if (size() == 2) { // we need to know only two subsequent elements
          pollLast(); // remove last to keep only requested number of elements
        }

        offerFirst(curElem);

        return this;
      }

      public T getPrevious() {
        return (size() < 2) ? null : getLast();
      }

      public T getCurrent() {
        return (size() == 0) ? null : getFirst();
      }
    }

    BoundedQueue<Integer> streamHistory = new BoundedQueue<Integer>();

    final List<Pair<Integer>> answer = input.stream()
      .map(i -> streamHistory.save(i))
      .filter(e -> e.getPrevious() != null)
      .map(e -> new Pair<Integer>(e.getPrevious(), e.getCurrent()))
      .collect(Collectors.toList());

    answer.forEach(System.out::println);
  }
}

Элегантным решением было бы использовать zip. Что-то вроде:

List<Integer> input = Arrays.asList(0, 1, 2, 3, 4);
Stream<Pair> pairStream = Streams.zip(input.stream(),
                                      input.stream().substream(1),
                                      (a, b) -> new Pair(a, b)
);

Это довольно лаконично и элегантно, однако в качестве входных данных используется список. Источник бесконечного потока не может быть обработан таким образом.

Другая (гораздо более неприятная) проблема заключается в том, что zip вместе со всем классом Streams в последнее время был удален из API. Приведенный выше код работает только с b95 или более ранними версиями. Так что с последним JDK я бы сказал, что нет элегантного решения в стиле FP, и сейчас мы можем только надеяться, что zip будет каким-то образом введен в API.

Я согласен с @aepurniet, но вместо map вы должны использовать mapToObj

range(0, 100).mapToObj((i) -> new Pair(i, i+1)).forEach(System.out::println);

Запустить for цикл, который работает от 0 до length-1 вашего потока

for(int i = 0 ; i < stream.length-1 ; i++)
{
    Pair pair = new Pair(stream[i], stream[i+1]);
    // then add your pair to an array
}
Другие вопросы по тегам