gpt4 book ai didi

java - 你能重新平衡一个未知大小的不平衡 Spliterator 吗?

转载 作者:行者123 更新时间:2023-12-02 07:32:04 25 4
gpt4 key购买 nike

我想使用 Stream 并行处理一组未知数量的异构远程存储 JSON 文件(文件数量预先未知)。这些文件的大小差异很大,从每个文件 1 个 JSON 记录到某些其他文件中的 100,000 条记录。在这种情况下,JSON 记录表示一个独立的 JSON 对象,表示为文件中的一行。

我真的很想为此使用 Streams,因此我实现了这个 Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;

public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}

private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}

private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}

@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}

private void open(String path) {
reader = openInputStream(path);
}

private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}

@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}

我遇到的问题是,虽然 Stream 一开始并行得很好,但最终最大的文件还是在单个线程中处理。我相信最直接的原因是有据可查的: split 器“不平衡”。

更具体地说,在 Stream.forEach 生命周期的某个点之后,似乎不会调用 trySplit 方法,因此在trySplit 的末尾很少被执行。

请注意从 trySplit 返回的所有 spliterator 如何共享相同的 paths 迭代器。我认为这是平衡所有 spliterator 之间工作的一种非常聪明的方法,但它还不足以实现完全并行性。

我希望首先跨文件进行并行处理,然后当少数大文件仍然处于 split 状态时,我想跨剩余文件的 block 进行并行处理。这就是 trySplit 末尾的 else block 的意图。

有没有一种简单/简单/规范的方法可以解决这个问题?

最佳答案

您的 trySplit 应该输出相同大小的分割,无论底层文件的大小如何。您应该将所有文件视为一个单元,并每次使用相同数量的 JSON 对象填充 ArrayList 支持的分割器。对象的数量应确保处理一个拆分需要 1 到 10 毫秒:低于 1 毫秒,您开始接近将批处理移交给工作线程的成本,高于此值,您开始面临 CPU 负载不均匀的风险,因为任务粒度太粗。

分割器没有义务报告大小估计,并且您已经正确执行了此操作:您的估计是Long.MAX_VALUE,这是一个特殊值,表示“无界”。但是,如果您有许多包含单个 JSON 对象的文件,导致批量大小为 1,这将以两种方式损害您的性能:打开-读取-关闭文件的开销可能会成为瓶颈,并且如果您设法逃脱也就是说,与处理一项的成本相比,线程切换的成本可能会很大,从而再次导致瓶颈。

五年前我正在解决类似的问题,你可以看看my solution .

关于java - 你能重新平衡一个未知大小的不平衡 Spliterator 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58601518/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com