Декартово произведение потоков в Java 8 как поток (используя только потоки)

Я хотел бы создать метод, который создает поток элементов, которые являются декартовыми произведениями из нескольких заданных потоков (объединяются в один и тот же тип в конце с помощью бинарного оператора). Обратите внимание, что как аргументы, так и результаты являются потоками, а не коллекциями.

Например, для двух потоков {A, B} и {X, Y} мне бы хотелось, чтобы он генерировал поток значений {AX, AY, BX, BY} (для объединения строк используется простая конкатенация). До сих пор я придумал этот код:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, Stream<T>... streams) {
    Stream<T> result = null;

    for (Stream<T> stream : streams) {
        if (result == null) {
            result = stream;
        } else {
            result = result.flatMap(m -> stream.map(n -> aggregator.apply(m, n)));
        }
    }

    return result;
}

Это мой желаемый вариант использования:

Stream<String> result = cartesian(
  (a, b) -> a + b, 
  Stream.of("A", "B"), 
  Stream.of("X", "Y")
);

System.out.println(result.collect(Collectors.toList()));

Ожидаемый результат: AX, AY, BX, BY,

Другой пример:

Stream<String> result = cartesian(
  (a, b) -> a + b, 
  Stream.of("A", "B"), 
  Stream.of("K", "L"), 
  Stream.of("X", "Y")
);

Ожидаемый результат: AKX, AKY, ALX, ALY, BKX, BKY, BLX, BLY,

Однако, если я запускаю код, я получаю эту ошибку:

IllegalStateException: поток уже был обработан или закрыт

Где используется поток? По FlatMap? Это легко исправить?

3 ответа

Решение

Передача потоков в вашем примере никогда не бывает лучше, чем передача списков:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, List<T>... lists) {
    ...
}

И используйте это так:

Stream<String> result = cartesian(
  (a, b) -> a + b, 
  Arrays.asList("A", "B"), 
  Arrays.asList("K", "L"), 
  Arrays.asList("X", "Y")
);

В обоих случаях вы создаете неявный массив из varargs и используете его в качестве источника данных, поэтому лень является мнимой. Ваши данные на самом деле хранятся в массивах.

В большинстве случаев результирующий поток декартовых продуктов намного длиннее входных, поэтому практически нет причин делать входные данные ленивыми. Например, имея пять списков из пяти элементов (всего 25), вы получите результирующий поток из 3125 элементов. Поэтому сохранение 25 элементов в памяти - не очень большая проблема. На самом деле в большинстве практических случаев они уже хранятся в памяти.

Чтобы сформировать поток декартовых продуктов, необходимо постоянно "перематывать" все потоки (кроме первого). Для перемотки потоки должны иметь возможность извлекать исходные данные снова и снова, либо буферизуя их каким-либо образом (что вам не нравится), либо снова извлекая их из источника (совокупность, массив, файл, сеть, случайные числа и т. Д.).) и выполните снова и снова все промежуточные операции. Если ваши исходные и промежуточные операции выполняются медленно, то отложенное решение может быть намного медленнее, чем решение буферизации. Если ваш источник не может произвести данные снова (например, генератор случайных чисел, который не может произвести те же числа, что и ранее), ваше решение будет неверным.

Тем не менее, вполне ленивое решение возможно. Просто используйте не потоки, а поставщиков потоков:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator,
                                       Supplier<Stream<T>>... streams) {
    return Arrays.stream(streams)
        .reduce((s1, s2) -> 
            () -> s1.get().flatMap(t1 -> s2.get().map(t2 -> aggregator.apply(t1, t2))))
        .orElse(Stream::empty).get();
}

Решение интересно, так как мы создаем и сокращаем поток поставщиков, чтобы получить конечного поставщика и, наконец, назвать его. Использование:

Stream<String> result = cartesian(
          (a, b) -> a + b, 
          () -> Stream.of("A", "B"), 
          () -> Stream.of("K", "L"), 
          () -> Stream.of("X", "Y")
        );
result.forEach(System.out::println);

stream потребляется в flatMap операция во второй итерации. Таким образом, вы должны создавать новый поток каждый раз, когда вы map твой результат. Поэтому вы должны собрать stream заранее, чтобы получить новый поток в каждой итерации.

private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) {
    Stream<T> result = null;
    for (Stream<T> stream : streams) {
        if (result == null) {
            result = stream;
        } else {
            Collection<T> s = stream.collect(Collectors.toList());
            result = result.flatMap(m -> s.stream().map(n -> aggregator.apply(m, n)));
        }
    }
    return result;
}

Или даже короче

private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) {
    return Arrays.stream(streams).reduce((r, s) -> {
        List<T> collect = s.collect(Collectors.toList());
        return r.flatMap(m -> collect.stream().map(n -> aggregator.apply(m, n)));
    }).orElse(Stream.empty());
}

Вы можете создать метод, который возвращает поток List<T>объектов и не агрегирует их. Алгоритм тот же: на каждом шаге собирайте элементы второго потока в список, а затем добавляйте их к элементам первого потока.

Агрегатор вне метода.

      @SuppressWarnings("unchecked")
public static <T> Stream<List<T>> cartesianProduct(Stream<T>... streams) {
    // incorrect incoming data
    if (streams == null) return Stream.empty();
    return Arrays.stream(streams)
            // non-null streams
            .filter(Objects::nonNull)
            // represent each list element as SingletonList<Object>
            .map(stream -> stream.map(Collections::singletonList))
            // summation of pairs of inner lists
            .reduce((stream1, stream2) -> {
                // list of lists from second stream
                List<List<T>> list2 = stream2.collect(Collectors.toList());
                // append to the first stream
                return stream1.flatMap(inner1 -> list2.stream()
                        // combinations of inner lists
                        .map(inner2 -> {
                            List<T> list = new ArrayList<>();
                            list.addAll(inner1);
                            list.addAll(inner2);
                            return list;
                        }));
            }).orElse(Stream.empty());
}
      public static void main(String[] args) {
    Stream<String> stream1 = Stream.of("A", "B");
    Stream<String> stream2 = Stream.of("K", "L");
    Stream<String> stream3 = Stream.of("X", "Y");
    @SuppressWarnings("unchecked")
    Stream<List<String>> stream4 = cartesianProduct(stream1, stream2, stream3);
    // output
    stream4.map(list -> String.join("", list)).forEach(System.out::println);
}

String.join в данном случае является своего рода агрегатором.

Выход:

      AKX
AKY
ALX
ALY
BKX
BKY
BLX
BLY

См. Также: Поток декартова произведения других потоков, каждый элемент в виде списка?

Другие вопросы по тегам