gpt4 book ai didi

java - 如何创建通用分页拆分器?

转载 作者:IT老高 更新时间:2023-10-28 20:23:47 24 4
gpt4 key购买 nike

我希望能够处理从必须在页面中访问的源读取的 java 流。作为第一种方法,我实现了一个分页迭代器,它仅在当前页面用完项目时请求页面,然后使用 StreamSupport.stream(iterator, false) 获取迭代器上的流句柄。

由于我发现我的页面获取起来非常昂贵,因此我想通过并行流来访问页面。在这一点上,我发现由于 java 直接从迭代器提供的拆分器实现,我的幼稚方法提供的并行性是不存在的。因为我实际上对我想遍历的元素了解很多(我知道请求第一页后的总结果数,并且源支持偏移量和限制)我认为应该可以实现我自己的拆分器来实现真正的并发(在页面元素上完成的工作和页面查询中)。

我已经能够很容易地实现“在元素上完成的工作”并发,但在我最初的实现中,页面的查询仅由最顶层的拆分器完成,因此无法从fork-join 实现提供的工作分工。

我怎样才能编写实现这两个目标的拆分器?

作为引用,我将提供我到目前为止所做的事情(我知道它没有适本地划分查询)。

   public final class PagingSourceSpliterator<T> implements Spliterator<T> {

public static final long DEFAULT_PAGE_SIZE = 100;

private Page<T> result;
private Iterator<T> results;
private boolean needsReset = false;
private final PageProducer<T> generator;
private long offset = 0L;
private long limit = DEFAULT_PAGE_SIZE;


public PagingSourceSpliterator(PageProducer<T> generator) {
this.generator = generator;
}

public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
this.generator = generator;
this.limit = pageSize;
}


@Override
public boolean tryAdvance(Consumer<? super T> action) {

if (hasAnotherElement()) {
if (!results.hasNext()) {
loadPageAndPrepareNextPaging();
}
if (results.hasNext()) {
action.accept(results.next());
return true;
}
}

return false;
}

@Override
public Spliterator<T> trySplit() {
// if we know there's another page, go ahead and hand off whatever
// remains of this spliterator as a new spliterator for other
// threads to work on, and then mark that next time something is
// requested from this spliterator it needs to be reset to the head
// of the next page
if (hasAnotherPage()) {
Spliterator<T> other = result.getPage().spliterator();
needsReset = true;
return other;
} else {
return null;
}

}

@Override
public long estimateSize() {
if(limit == 0) {
return 0;
}

ensureStateIsUpToDateEnoughToAnswerInquiries();
return result.getTotalResults();
}

@Override
public int characteristics() {
return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
}

private boolean hasAnotherElement() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (results.hasNext() || hasAnotherPage());
}

private boolean hasAnotherPage() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (result.getTotalResults() > offset);
}

private boolean isBound() {
return Objects.nonNull(results) && Objects.nonNull(result);
}

private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
ensureBound();
ensureResetIfNecessary();
}

private void ensureBound() {
if (!isBound()) {
loadPageAndPrepareNextPaging();
}
}

private void ensureResetIfNecessary() {
if(needsReset) {
loadPageAndPrepareNextPaging();
needsReset = false;
}
}

private void loadPageAndPrepareNextPaging() {
// keep track of the overall result so that we can reference the original list and total size
this.result = generator.apply(offset, limit);

// make sure that the iterator we use to traverse a single page removes
// results from the underlying list as we go so that we can simply pass
// off the list spliterator for the trySplit rather than constructing a
// new kind of spliterator for what remains.
this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
@Override
public T next() {
T next = super.next();
this.remove();
return next;
}
};

// update the paging for the next request and inquiries prior to the next request
// we use the page of the actual result set instead of the limit in case the limit
// was not respected exactly.
this.offset += result.getPage().size();
}

public static class DelegatingIterator<T> implements Iterator<T> {

private final Iterator<T> iterator;

public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}


@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
return iterator.next();
}

@Override
public void remove() {
iterator.remove();
}

@Override
public void forEachRemaining(Consumer<? super T> action) {
iterator.forEachRemaining(action);
}
}
}

以及我的网页来源:

public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {

}

还有一个页面:

public final class Page<T> {

private long totalResults;
private final List<T> page = new ArrayList<>();

public long getTotalResults() {
return totalResults;
}

public List<T> getPage() {
return page;
}

public Page setTotalResults(long totalResults) {
this.totalResults = totalResults;
return this;
}

public Page setPage(List<T> results) {
this.page.clear();
this.page.addAll(results);
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Page)) {
return false;
}
Page<?> page1 = (Page<?>) o;
return totalResults == page1.totalResults && Objects.equals(page, page1.page);
}

@Override
public int hashCode() {
return Objects.hash(totalResults, page);
}

}

以及获取带有“慢”分页的流进行测试的示例

private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {

PageProducer<T> producer = (offset, limit) -> {

try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

int beginIndex = offset.intValue();
int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
return new Page<T>().setTotalResults(things.size())
.setPage(things.subList(beginIndex, endIndex));
};

return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}

最佳答案

拆分器没有让您更接近目标的主要原因是它试图拆分页面,而不是源元素空间。如果您知道元素的总数并且有一个源允许通过偏移量和限制来获取页面,那么拆分器最自然的形式是在这些元素中封装一个范围,例如通过偏移和限制或结束。然后,拆分意味着仅拆分该 范围,将拆分器的偏移量调整到拆分位置并创建一个表示前缀的新拆分器,从“旧偏移量”到拆分位置。

Before splitting:
this spliterator: offset=x, end=y
After splitting:
this spliterator: offset=z, end=y
returned spliterator: offset=x, end=z

x <= z <= y

而在最好的情况下,z 正好在 xy 之间,以产生平衡的分割,但在我们的例子中,我们会稍微调整它以生成页面大小的倍数的工作集。

这个逻辑不需要抓取页面,所以如果你推迟抓取页面,框架想要开始遍历,即拆分之后,抓取操作可以并行运行.最大的障碍是您需要获取第一页才能了解元素的总数。下面的解决方案将第一次提取与其他提取分开,简化了实现。当然,它必须传递第一个页面获取的结果,该结果将在第一次遍历时消耗(在顺序情况下)或作为第一个拆分前缀返回,此时接受一个不平衡拆分,但没有以后再处理。

public class PagingSpliterator<T> implements Spliterator<T> {
public interface PageFetcher<T> {
List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);
}
public static final long DEFAULT_PAGE_SIZE = 100;

public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {
return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);
}
public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,
long pageSize, boolean parallel) {
if(pageSize<=0) throw new IllegalArgumentException();
return StreamSupport.stream(() -> {
PagingSpliterator<T> pgSp
= new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);
pgSp.danglingFirstPage
=spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));
return pgSp;
}, CHARACTERISTICS, parallel);
}
private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;

private final PageFetcher<T> supplier;
long start, end, pageSize;
Spliterator<T> currentPage, danglingFirstPage;

PagingSpliterator(PageFetcher<T> supplier,
long start, long end, long pageSize) {
this.supplier = supplier;
this.start = start;
this.end = end;
this.pageSize = pageSize;
}

public boolean tryAdvance(Consumer<? super T> action) {
for(;;) {
if(ensurePage().tryAdvance(action)) return true;
if(start>=end) return false;
currentPage=null;
}
}
public void forEachRemaining(Consumer<? super T> action) {
do {
ensurePage().forEachRemaining(action);
currentPage=null;
} while(start<end);
}
public Spliterator<T> trySplit() {
if(danglingFirstPage!=null) {
Spliterator<T> fp=danglingFirstPage;
danglingFirstPage=null;
start=fp.getExactSizeIfKnown();
return fp;
}
if(currentPage!=null)
return currentPage.trySplit();
if(end-start>pageSize) {
long mid=(start+end)>>>1;
mid=mid/pageSize*pageSize;
if(mid==start) mid+=pageSize;
return new PagingSpliterator<>(supplier, start, start=mid, pageSize);
}
return ensurePage().trySplit();
}
/**
* Fetch data immediately before traversing or sub-page splitting.
*/
private Spliterator<T> ensurePage() {
if(danglingFirstPage!=null) {
Spliterator<T> fp=danglingFirstPage;
danglingFirstPage=null;
currentPage=fp;
start=fp.getExactSizeIfKnown();
return fp;
}
Spliterator<T> sp = currentPage;
if(sp==null) {
if(start>=end) return Spliterators.emptySpliterator();
sp = spliterator(supplier.fetchPage(
start, Math.min(end-start, pageSize), l->{}));
start += sp.getExactSizeIfKnown();
currentPage=sp;
}
return sp;
}
/**
* Ensure that the sub-spliterator provided by the List is compatible with
* ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,
* the spliterators are, so the costs of dumping into an intermediate array
* in the other case is irrelevant.
*/
private static <E> Spliterator<E> spliterator(List<E> list) {
Spliterator<E> sp = list.spliterator();
if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))
sp=Spliterators.spliterator(
StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);
return sp;
}
public long estimateSize() {
if(currentPage!=null) return currentPage.estimateSize();
return end-start;
}
public int characteristics() {
return CHARACTERISTICS;
}
}

它使用一个专门的 PageFetcher 功能接口(interface),可以通过调用回调的 accept 方法来实现总大小并返回项目列表。分页拆分器将简单地委托(delegate)给列表的拆分器进行遍历,如果并发性明显高于生成的页面数,它甚至可以从拆分这些页面拆分器中受益,这意味着随机访问列表,如 ArrayList,是这里的首选列表类型。

调整您的示例代码以适应

private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {
totalSizeSink.accept(things.size());
if(offset>things.size()) return Collections.emptyList();
int beginIndex = (int)offset;
assert beginIndex==offset;
int endIndex = Math.min(beginIndex+(int)limit, things.size());
System.out.printf("Page %6d-%6d:\t%s%n",
beginIndex, endIndex, Thread.currentThread());
// artificial slowdown
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return things.subList(beginIndex, endIndex);
}, pageSize, true);
}

你可以像这样测试它

List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList());
List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList());
if(!samples.equals(result))
throw new AssertionError();

如果有足够的空闲 CPU 内核,它将演示如何同时获取页面,因此是无序的,而结果将正确地按遇到顺序。您还可以测试页面较少时适用的子页面并发:

Set<Thread> threads=ConcurrentHashMap.newKeySet();
List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList());
List<Integer> result=asSlowPagedSource(500_000, samples)
.peek(x -> threads.add(Thread.currentThread()))
.collect(Collectors.toList());
if(!samples.equals(result))
throw new AssertionError();
System.out.println("Concurrency: "+threads.size());

关于java - 如何创建通用分页拆分器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38128274/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com