gpt4 book ai didi

java - 在并行流式处理之前或期间有效地预处理 CSV 数据

转载 作者:行者123 更新时间:2023-11-29 04:25:40 24 4
gpt4 key购买 nike

我正在寻找一种在将 CSV 数据转储到 Java 流之前(或同时)对其进行预处理的有效方法。

在正常情况下我会做这样的事情来处理文件:

File input = new File("helloworld.csv");
InputStream is = new FileInputStream(input);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
br.lines().parallel().forEach(line -> {
System.out.println(line);
});

但是在当前这种情况下,我需要在流式传输记录之前或期间对记录进行预处理,并且我 Collection 中的每个项目都可能依赖于前一个。这是一个简单的 CSV 文件示例,用于演示该问题:

species, breed, name
dog, lab, molly
, greyhound, stella
, beagle, stanley
cat, siamese, toby
, persian, fluffy

在我的示例 CSV 中,物种列仅在它从一条记录更改为另一条记录时才被填充。我知道简单的答案是修复我的 CSV 输出,但在这种情况下这是不可能的。

我正在寻找一种合理有效的方法来处理来自 CSV 的记录,如果空白,则从先前的记录中复制物种值,然后在预处理后传递给并行流。

下游处理可能需要很长时间,所以我最终需要在预处理完成后并行处理。我的 CSV 文件也可能很大,因此我想避免先将整个文件加载到内存中的对象中。

我希望有一些方法可以做类似下面的事情(警告错误的伪代码):

parallelStream.startProcessing

while read line {
if (line.doesntHaveSpecies) {
line.setSpecies
}
parallelStream.add(line)
}

我目前的解决方案是处理整个文件并“修复它”,然后将其流式传输。由于文件可能很大,最好在“修复”记录后且在处理整个文件之前立即开始处理记录。

最佳答案

您必须将状态封装到 Spliterator 中。

private static Stream<String> getStream(BufferedReader br) {
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<String>(
100, Spliterator.ORDERED|Spliterator.NONNULL) {
String prev;
public boolean tryAdvance(Consumer<? super String> action) {
try {
String next = br.readLine();
if(next==null) return false;
final int ix = next.indexOf(',');
if(ix==0) {
if(prev==null)
throw new IllegalStateException("first line without value");
next = prev+next;
}
else prev=ix<0? next: next.substring(0, ix);
action.accept(next);
return true;
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}, false);
}

可以用作

try(Reader r = new FileReader(input);
BufferedReader br = new BufferedReader(r)) {

getStream(br).forEach(System.out::println);
}

Spliterator 将始终按顺序遍历。如果打开并行处理,Stream 实现将尝试通过调用 trySplit 为其他线程获取新的 Spliterator 实例。由于我们无法为该操作提供有效的策略,因此我们从 AbstractSpliterator 继承默认值,它将执行一些基于数组的缓冲。这将始终正常工作,但只有在后续流管道中有大量计算时才会得到返回。否则,您可能只是继续使用顺序执行。

关于java - 在并行流式处理之前或期间有效地预处理 CSV 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46472785/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com