Как я могу создать универсальный пейджинговый сплитератор?
Я хотел бы иметь возможность обрабатывать чтение потока Java из источника, который должен быть доступен на страницах. В качестве первого подхода я реализовал итератор подкачки, который просто запрашивал страницы, когда на текущей странице заканчивались элементы, а затем использовал StreamSupport.stream(iterator, false)
чтобы получить ручку потока через итератор.
Поскольку я обнаружил, что мои страницы довольно дороги для извлечения, я хотел бы получить доступ к страницам через параллельный поток. В этот момент я обнаружил, что параллелизм, обеспечиваемый моим наивным подходом, не существует из-за реализации сплитератора, которую java предоставляет непосредственно от итератора. Поскольку я на самом деле знаю довольно много об элементах, которые я хотел бы пройти (я знаю общее количество результатов после запроса первой страницы, а источник поддерживает смещение и ограничение), я думаю, что должна быть возможность реализовать свой собственный сплитератор, который достигает реальный параллелизм (как в работе над элементами страницы, так и в запросах страницы).
Мне удалось достичь параллелизма "работа над элементами" довольно легко, но в моей первоначальной реализации запрос страницы выполнялся только самым верхним сплитератором и, таким образом, не извлекал выгоду из разделения работы. предлагаемый реализацией fork-join.
Как я могу написать сплитератор, который достигает обеих этих целей?
Для справки я приведу то, что я сделал до сих пор (я знаю, что это не разделяет запросы должным образом).
public final class PagingSourceSpliterator<T> implements Spliterator<T> {
public static final long DEFAULT_PAGE_SIZE = 100;
private Page<T> result;
private Iterator<T> results;
private boolean needsReset = false;
private final PageProducer<T> generator;
private long offset = 0L;
private long limit = DEFAULT_PAGE_SIZE;
public PagingSourceSpliterator(PageProducer<T> generator) {
this.generator = generator;
}
public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
this.generator = generator;
this.limit = pageSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (hasAnotherElement()) {
if (!results.hasNext()) {
loadPageAndPrepareNextPaging();
}
if (results.hasNext()) {
action.accept(results.next());
return true;
}
}
return false;
}
@Override
public Spliterator<T> trySplit() {
// if we know there's another page, go ahead and hand off whatever
// remains of this spliterator as a new spliterator for other
// threads to work on, and then mark that next time something is
// requested from this spliterator it needs to be reset to the head
// of the next page
if (hasAnotherPage()) {
Spliterator<T> other = result.getPage().spliterator();
needsReset = true;
return other;
} else {
return null;
}
}
@Override
public long estimateSize() {
if(limit == 0) {
return 0;
}
ensureStateIsUpToDateEnoughToAnswerInquiries();
return result.getTotalResults();
}
@Override
public int characteristics() {
return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
}
private boolean hasAnotherElement() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (results.hasNext() || hasAnotherPage());
}
private boolean hasAnotherPage() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (result.getTotalResults() > offset);
}
private boolean isBound() {
return Objects.nonNull(results) && Objects.nonNull(result);
}
private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
ensureBound();
ensureResetIfNecessary();
}
private void ensureBound() {
if (!isBound()) {
loadPageAndPrepareNextPaging();
}
}
private void ensureResetIfNecessary() {
if(needsReset) {
loadPageAndPrepareNextPaging();
needsReset = false;
}
}
private void loadPageAndPrepareNextPaging() {
// keep track of the overall result so that we can reference the original list and total size
this.result = generator.apply(offset, limit);
// make sure that the iterator we use to traverse a single page removes
// results from the underlying list as we go so that we can simply pass
// off the list spliterator for the trySplit rather than constructing a
// new kind of spliterator for what remains.
this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
@Override
public T next() {
T next = super.next();
this.remove();
return next;
}
};
// update the paging for the next request and inquiries prior to the next request
// we use the page of the actual result set instead of the limit in case the limit
// was not respected exactly.
this.offset += result.getPage().size();
}
public static class DelegatingIterator<T> implements Iterator<T> {
private final Iterator<T> iterator;
public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
iterator.forEachRemaining(action);
}
}
}
И источник моих страниц:
public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {
}
И страница:
public final class Page<T> {
private long totalResults;
private final List<T> page = new ArrayList<>();
public long getTotalResults() {
return totalResults;
}
public List<T> getPage() {
return page;
}
public Page setTotalResults(long totalResults) {
this.totalResults = totalResults;
return this;
}
public Page setPage(List<T> results) {
this.page.clear();
this.page.addAll(results);
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Page)) {
return false;
}
Page<?> page1 = (Page<?>) o;
return totalResults == page1.totalResults && Objects.equals(page, page1.page);
}
@Override
public int hashCode() {
return Objects.hash(totalResults, page);
}
}
И пример получения потока с "медленной" подкачкой для тестирования
private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
PageProducer<T> producer = (offset, limit) -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int beginIndex = offset.intValue();
int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
return new Page<T>().setTotalResults(things.size())
.setPage(things.subList(beginIndex, endIndex));
};
return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}
2 ответа
Основная причина, по которой ваш сплитератор не приближает вас к вашей цели, состоит в том, что он пытается разделить страницы, а не пространство элементов источника. Если вы знаете общее количество элементов и имеете источник, позволяющий извлекать страницу с помощью смещения и лимита, наиболее естественной формой сплитератора является инкапсуляция диапазона внутри этих элементов, например, с помощью смещения и лимита или конца. Затем разделение означает просто разделение этого диапазона, адаптацию смещения вашего сплитератора к позиции разделения и создание нового сплитератора, представляющего префикс, от "старого смещения" до позиции разделения.
Before splitting:
this spliterator: offset=x, end=y
After splitting:
this spliterator: offset=z, end=y
returned spliterator: offset=x, end=z
x <= z <= y
Тогда как в лучшем случае z
точно посередине между x
а также y
, чтобы получить сбалансированные разбиения, но в нашем случае мы немного адаптируем его для получения рабочих наборов, кратных размеру страницы.
Эта логика работает без необходимости извлечения страниц, поэтому, если вы откладываете извлечение страниц до момента, фреймворк хочет начать обход, то есть после разделения операции извлечения могут выполняться параллельно. Самым большим препятствием является тот факт, что вам нужно получить первую страницу, чтобы узнать общее количество элементов. Приведенное ниже решение отделяет эту первую выборку от остальных, упрощая реализацию. Конечно, он должен передать результат этой первой выборки страницы, который будет использован при первом обходе (в последовательном случае) или возвращен как первый префикс разделения, принимая в этот момент одно несбалансированное разделение, но не имея иметь дело с этим больше потом.
public class PagingSpliterator<T> implements Spliterator<T> {
public interface PageFetcher<T> {
List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);
}
public static final long DEFAULT_PAGE_SIZE = 100;
public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {
return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);
}
public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,
long pageSize, boolean parallel) {
if(pageSize<=0) throw new IllegalArgumentException();
return StreamSupport.stream(() -> {
PagingSpliterator<T> pgSp
= new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);
pgSp.danglingFirstPage
=spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));
return pgSp;
}, CHARACTERISTICS, parallel);
}
private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;
private final PageFetcher<T> supplier;
long start, end, pageSize;
Spliterator<T> currentPage, danglingFirstPage;
PagingSpliterator(PageFetcher<T> supplier,
long start, long end, long pageSize) {
this.supplier = supplier;
this.start = start;
this.end = end;
this.pageSize = pageSize;
}
public boolean tryAdvance(Consumer<? super T> action) {
for(;;) {
if(ensurePage().tryAdvance(action)) return true;
if(start>=end) return false;
currentPage=null;
}
}
public void forEachRemaining(Consumer<? super T> action) {
do {
ensurePage().forEachRemaining(action);
currentPage=null;
} while(start<end);
}
public Spliterator<T> trySplit() {
if(danglingFirstPage!=null) {
Spliterator<T> fp=danglingFirstPage;
danglingFirstPage=null;
start=fp.getExactSizeIfKnown();
return fp;
}
if(currentPage!=null)
return currentPage.trySplit();
if(end-start>pageSize) {
long mid=(start+end)>>>1;
mid=mid/pageSize*pageSize;
if(mid==start) mid+=pageSize;
return new PagingSpliterator<>(supplier, start, start=mid, pageSize);
}
return ensurePage().trySplit();
}
/**
* Fetch data immediately before traversing or sub-page splitting.
*/
private Spliterator<T> ensurePage() {
if(danglingFirstPage!=null) {
Spliterator<T> fp=danglingFirstPage;
danglingFirstPage=null;
currentPage=fp;
start=fp.getExactSizeIfKnown();
return fp;
}
Spliterator<T> sp = currentPage;
if(sp==null) {
if(start>=end) return Spliterators.emptySpliterator();
sp = spliterator(supplier.fetchPage(
start, Math.min(end-start, pageSize), l->{}));
start += sp.getExactSizeIfKnown();
currentPage=sp;
}
return sp;
}
/**
* Ensure that the sub-spliterator provided by the List is compatible with
* ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,
* the spliterators are, so the costs of dumping into an intermediate array
* in the other case is irrelevant.
*/
private static <E> Spliterator<E> spliterator(List<E> list) {
Spliterator<E> sp = list.spliterator();
if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))
sp=Spliterators.spliterator(
StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);
return sp;
}
public long estimateSize() {
if(currentPage!=null) return currentPage.estimateSize();
return end-start;
}
public int characteristics() {
return CHARACTERISTICS;
}
}
Он использует специализированный PageFetcher
функциональный интерфейс, который может быть реализован путем вызова accept
метод обратного вызова с результирующим общим размером и возвращением списка предметов. Сплитератор подкачки просто делегирует сплитератору списка для прохождения, и в случае, если параллелизм значительно выше, чем итоговое количество страниц, он может даже выиграть от разделения этих сплитераторов страниц, что подразумевает, что списки произвольного доступа, такие как ArrayList
, являются предпочтительным типом списка здесь.
Адаптация вашего примера кода для
private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {
totalSizeSink.accept(things.size());
if(offset>things.size()) return Collections.emptyList();
int beginIndex = (int)offset;
assert beginIndex==offset;
int endIndex = Math.min(beginIndex+(int)limit, things.size());
System.out.printf("Page %6d-%6d:\t%s%n",
beginIndex, endIndex, Thread.currentThread());
// artificial slowdown
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return things.subList(beginIndex, endIndex);
}, pageSize, true);
}
вы можете проверить это как
List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList());
List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList());
if(!samples.equals(result))
throw new AssertionError();
При наличии достаточного количества свободных ядер ЦП будет продемонстрировано, как страницы извлекаются одновременно, а следовательно, неупорядоченно, в то время как результат будет правильно в порядке обращения. Вы также можете проверить параллелизм подстраницы, который применяется, когда страниц меньше:
Set<Thread> threads=ConcurrentHashMap.newKeySet();
List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList());
List<Integer> result=asSlowPagedSource(500_000, samples)
.peek(x -> threads.add(Thread.currentThread()))
.collect(Collectors.toList());
if(!samples.equals(result))
throw new AssertionError();
System.out.println("Concurrency: "+threads.size());
https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html
Насколько я понимаю, скорость разделения происходит от неизменности. Чем более неизменен источник, тем быстрее обработка, поскольку неизменность лучше обеспечивает параллельную обработку или, скорее, разделение.
Идея заключается в том, чтобы как можно лучше адресовать изменения, если таковые имеются, к источнику, прежде чем связывать его в целом (лучше) или по частям (обычно так, а следовательно, с вашим и многими другими вызовами) со сплитераторами.
В вашем случае это может означать, что сначала должны соблюдаться размеры страниц, а не:
//.. in case the limit was not respected exactly.
this.offset += result.getPage().size();
Это также может означать, что поток потока должен быть подготовлен, а не использоваться в качестве прямого источника.
В конце документа приведен пример того, "как среда параллельных вычислений, такая как пакет java.util.stream, будет использовать Spliterator в параллельных вычислениях"
Обратите внимание, что поток использует сплитератор, а не тот, который использует поток в качестве источника.
В конце примера есть интересный метод "вычисления".
PS, если вы когда-нибудь получите общий эффективный класс PageSpliterator, обязательно сообщите об этом некоторым из нас.
веселит.