gpt4 book ai didi

java - 收集器将流分成给定大小的 block

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:32:37 25 4
gpt4 key购买 nike

我手头有一个问题,我正试图用一些我很确定我不应该做的事情来解决,但没有看到替代方案。我得到了一个字符串列表,应该将它分成给定大小的 block 。然后必须将结果传递给某种方法以进行进一步处理。由于列表可能很大,因此处理应该异步完成。

我的方法是创建一个自定义收集器,它接受字符串流并将其转换为 Stream >:

final Stream<List<Long>> chunks = list
.stream()
.parallel()
.collect(MyCollector.toChunks(CHUNK_SIZE))
.flatMap(p -> doStuff(p))
.collect(MyCollector.toChunks(CHUNK_SIZE))
.map(...)
...

收集器的代码:

public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;

private MyCollector(final int chunkSize){
this.chunkSize = chunkSize;
}

@Override
public Supplier<A> supplier() {
return () -> (A)new ArrayList<List<T>>();
}

@Override
public BiConsumer<A, T> accumulator() {
return (A candidate, T acc) -> {
if (index.getAndIncrement() % chunkSize == 0){
candidate.add(new ArrayList<>(chunkSize));
current.incrementAndGet();
}
candidate.get(current.get()).add(acc);
};
}

@Override
public BinaryOperator<A> combiner() {
return (a1, a2) -> {
a1.addAll(a2);
return a1;
};
}
@Override
public Function<A, R> finisher() {
return (a) -> (R)a.stream();
}

@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}

public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
return new MyCollector<>(chunkSize);
}

这似乎在大多数情况下都有效,但有时我会遇到 NPE。我确信累加器中的 不是线程安全的,因为在将新列表添加到主列表时可能会有两个线程干扰。不过,我不介意 block 中的元素过多或过少。

我试过这个而不是当前的供应商功能:

 return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};

确保始终存在一个列表。这根本不起作用并导致空列表。

问题:

  • 我很确定自定义 Spliterator 会是一个很好的解决方案。但是,它不适用于同步场景。另外,我确定调用了 Spliterator 吗?
    • 我知道我根本不应该有状态,但不确定如何更改它。

问题:

  • 这种方法是完全错误的还是可以通过某种方式解决的?
  • 如果我使用 Spliterator - 我可以确定它被调用了还是由底层实现决定?
  • 我很确定在供应商和终结者中对 (A) 和 (R) 的转换不是必需的,但 IntelliJ 提示。有什么我想念的吗?

编辑:

  • 我在客户端代码中添加了更多内容,因为 IntStream.range 的建议在链接时将不起作用。
  • 我意识到我可以按照评论中的建议采取不同的做法,但这也与风格有关,并且知道是否可行。
  • 我有 CONCURRENT 特性,因为我假设 Stream API 会退回到同步处理,否则。如前所述,该解决方案不是线程安全的。

如有任何帮助,我们将不胜感激。

最好的,

最佳答案

我还不能发表评论,但我想将以下链接发布到一个非常相似的问题(尽管据我所知不是重复的):Java 8 Stream with batch processing

您可能还对 GitHub 上的以下问题感兴趣:https://github.com/jOOQ/jOOL/issues/296


现在,您对 CONCURRENT 的使用特征错误 - 文档对 Collector.Characteristics.CONCURRENT 说了以下内容:

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

这意味着 supplier只被调用一次,combiner实际上永远不会被调用(参见 ReferencePipeline.collect() 方法的来源)。这就是为什么您有时会遇到 NPE。

因此,我建议使用您提出的简化版本:

public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) {
return Collector.of(
ArrayList::new,
(outerList, item) -> {
if (outerList.isEmpty() || last(outerList).size() >= chunkSize) {
outerList.add(new ArrayList<>(chunkSize));
}
last(outerList).add(item);
},
(a, b) -> {
a.addAll(b);
return a;
},
List::stream,
Collector.Characteristics.UNORDERED
);
}

private static <T> T last(List<T> list) {
return list.get(list.size() - 1);
}

或者,您可以编写一个真正并发的 Collector使用适当的同步,但如果您不介意拥有多个大小小于 chunkSize 的列表(这是使用非并发 Collector 可以获得的效果,就像我上面提出的那样),我不会打扰。

关于java - 收集器将流分成给定大小的 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50869324/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com