Являются ли преобразователи Clojure той же концепцией, что и промежуточные операции над потоками в Java?
Когда я узнал о преобразователях в Clojure, меня внезапно поразило то, о чем они мне напомнили: потоки Java 8!
Преобразователи являются составными алгоритмическими преобразованиями. Они не зависят от контекста своих входных и выходных источников и определяют только сущность преобразования в терминах отдельного элемента.
Поток не является структурой данных, которая хранит элементы; вместо этого он передает элементы из источника, такого как структура данных, массив, функция генератора или канал ввода / вывода, через конвейер вычислительных операций.
Clojure:
(def xf
(comp
(filter odd?)
(map inc)
(take 5)))
(println
(transduce xf + (range 100))) ; => 30
(println
(into [] xf (range 100))) ; => [2 4 6 8 10]
Джава:
// Purposely using Function and boxed primitive streams (instead of
// UnaryOperator<LongStream>) in order to keep it general.
Function<Stream<Long>, Stream<Long>> xf =
s -> s.filter(n -> n % 2L == 1L)
.map(n -> n + 1L)
.limit(5L);
System.out.println(
xf.apply(LongStream.range(0L, 100L).boxed())
.reduce(0L, Math::addExact)); // => 30
System.out.println(
xf.apply(LongStream.range(0L, 100L).boxed())
.collect(Collectors.toList())); // => [2, 4, 6, 8, 10]
Помимо различий в статической / динамической типизации, они кажутся мне очень похожими по назначению и использованию.
Является ли аналогия с трансформациями потоков Java разумным подходом к преобразователям? Если нет, то как он ошибочен или как они различаются по концепции (не говоря о реализации)?
1 ответ
Основное отличие состоит в том, что набор глаголов (операций) как-то закрыт для потоков, а открыт для преобразователей: попробуйте, например, реализовать partition
на потоках, это чувствует себя немного вторым классом:
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;
public class StreamUtils {
static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
return Stream.of((Object) null).flatMap(x -> thunk.get());
}
static class Partitioner<T> implements Function<T, Stream<Stream<T>>> {
final Function<T, ?> f;
Object prev;
Builder<T> sb;
public Partitioner(Function<T, ?> f) {
this.f = f;
}
public Stream<Stream<T>> apply(T t) {
Object tag = f.apply(t);
if (sb != null && prev.equals(tag)) {
sb.accept(t);
return Stream.empty();
}
Stream<Stream<T>> partition = sb == null ? Stream.empty() : Stream.of(sb.build());
sb = Stream.builder();
sb.accept(t);
prev = tag;
return partition;
}
Stream<Stream<T>> flush() {
return sb == null ? Stream.empty() : Stream.of(sb.build());
}
}
static <T> Stream<Stream<T>> partitionBy(Stream<T> in, Function<T, ?> f) {
Partitioner<T> partitioner = new Partitioner<>(f);
return Stream.concat(in.flatMap(partitioner), delay(() -> partitioner.flush()));
}
}
Также как последовательности и редукторы, когда вы преобразуете, вы не создаете "большие" вычисления, вы создаете "больший" источник.
Чтобы иметь возможность проходить вычисления, вы представили xf
функция от Stream до Stream, чтобы поднять операции от методов до объектов первого класса (чтобы развязать их от источника). Тем самым вы создали преобразователь, хотя и со слишком большим интерфейсом.
Ниже приведена более общая версия приведенного выше кода для применения любого (clojure) преобразователя к потоку:
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;
import clojure.lang.AFn;
import clojure.lang.IFn;
import clojure.lang.Reduced;
public class StreamUtils {
static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
return Stream.of((Object) null).flatMap(x -> thunk.get());
}
static class Transducer implements Function {
IFn rf;
public Transducer(IFn xf) {
rf = (IFn) xf.invoke(new AFn() {
public Object invoke(Object acc) {
return acc;
}
public Object invoke(Object acc, Object item) {
((Builder<Object>) acc).accept(item);
return acc;
}
});
}
public Stream<?> apply(Object t) {
if (rf == null) return Stream.empty();
Object ret = rf.invoke(Stream.builder(), t);
if (ret instanceof Reduced) {
Reduced red = (Reduced) ret;
Builder<?> sb = (Builder<?>) red.deref();
return Stream.concat(sb.build(), flush());
}
return ((Builder<?>) ret).build();
}
Stream<?> flush() {
if (rf == null) return Stream.empty();
Builder<?> sb = (Builder<?>) rf.invoke(Stream.builder());
rf = null;
return sb.build();
}
}
static <T> Stream<?> withTransducer(Stream<T> in, IFn xf) {
Transducer transducer = new Transducer(xf);
return Stream.concat(in.flatMap(transducer), delay(() -> transducer.flush()));
}
}
Еще одно важное отличие, которое я вижу, заключается в том, что преобразователи Clojure можно компоновать. У меня часто бывает ситуация, когда мои потоковые конвейеры немного длиннее, чем в вашем примере, где есть только некоторые промежуточные шаги, которые я мог бы повторно использовать в другом месте, например:
someStream
.map(...)
.filter(...)
.map(...) // <- gee, there are at least two other
.filter(...) // <- pipelines where I could use the functionality
.map(...) // <- of just these three steps!
.filter(...)
.collect(...)
Я не нашел разумного способа добиться этого. Я хотел бы иметь что-то вроде этого:
Transducer<Integer,String> smallTransducer = s -> s.map(...); // usable in a stream Integer -> String
Transducer<String,MyClass> otherTransducer = s -> s.filter(...).map(...); // stream String -> MyClass
Transducer<Integer,MyClass> combinedTransducer = smallTransducer.then(otherTransducer); // compose transducers, to get an Integer -> MyClass transducer
а затем используйте его так:
someStream
.map(...)
.filter(...)
.transduce(smallTransducer)
.transduce(otherTransducer)
.filter(...)
.collect(...)
// or
someStream
.map(...)
.filter(...)
.transduce(combinedTransducer)
.filter(...)
.collect(...)