gpt4 book ai didi

java - 是否可以从仅公开迭代器的 "readNext"部分的对象创建流?

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:07:08 26 4
gpt4 key购买 nike

我试图从一个 csv 文件中读取,但由于它的大小,没有先将它全部加载到内存中。

我找到的用于读取 csv 的库是 opencsv,它工作得很好,但只公开了两个方法:

readAll() 

readNext() 

readAll 已经用完了,因为我不想同时在内存中,所以我想通过 readNext 从文件中延迟读取.理想情况下,我想通过流来结束阅读。

我得到的最接近的方法是将 readnext 方法提供给 Stream.generate 结构,

Stream csvDataStream = Stream.generate(csvReader::readNext); 

但这显然有一个巨大的缺点,即一旦底层 csvReader 的迭代器耗尽就会抛出错误。我真的不想将我的整个程序包装在一个 try/catch block 中,因为我使用的语言有误。有没有办法从仅公开 next 方法的内容创建流?

最佳答案

这是我项目中现成的实现。我有一个抽象拆分器,它处理拆分成固定大小的批处理,并允许高效并行处理任何类型的基于 I/O 的流源:

import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public abstract class FixedBatchSpliteratorBase<T> implements Spliterator<T> {
private final int batchSize;
private final int characteristics;
private long est;

public FixedBatchSpliteratorBase(int characteristics, int batchSize, long est) {
characteristics |= ORDERED;
if ((characteristics & SIZED) != 0) characteristics |= SUBSIZED;
this.characteristics = characteristics;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorBase(int characteristics, int batchSize) {
this(characteristics, batchSize, Long.MAX_VALUE);
}
public FixedBatchSpliteratorBase(int characteristics) {
this(characteristics, 64, Long.MAX_VALUE);
}

@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }

static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}

这是基于它的 opencsv 拆分器:

public class CsvSpliterator extends FixedBatchSpliteratorBase<String[]> {
private final CSVReader cr;

CsvSpliterator(CSVReader cr, int batchSize) {
super(NONNULL, batchSize);
if (cr == null) throw new NullPointerException("CSVReader is null");
this.cr = cr;
}
public CsvSpliterator(CSVReader cr) { this(cr, 100); }

@Override public void forEachRemaining(Consumer<? super String[]> action) {
if (action == null) throw new NullPointerException();
uncheckRun(() -> { for (String[] row; (row = cr.readNext()) != null;) action.accept(row); });
}
@Override public boolean tryAdvance(Consumer<? super String[]> action) {
if (action == null) throw new NullPointerException();
return uncheckCall(() -> {
final String[] row = cr.readNext();
if (row == null) return false;
action.accept(row);
return true;
});
}
}

uncheckRununcheckCall

public static <T> T uncheckCall(Callable<T> callable) {
try { return callable.call(); }
catch (Exception e) { return sneakyThrow(e); }
}
public static void uncheckRun(RunnableExc r) {
try { r.run(); } catch (Exception e) { sneakyThrow(e); }
}
public static <T> T sneakyThrow(Throwable e) {
return Util.<RuntimeException, T>sneakyThrow0(e);
}
@SuppressWarnings("unchecked")
private static <E extends Throwable, T> T sneakyThrow0(Throwable t) throws E { throw (E)t; }

用法:

import static java.util.stream.StreamSupport.stream;

....

final CSVReader cr = new CSVReader(new InputStreamReader(yourInputStream), separator, '"');
return stream(new CsvSpliterator(cr), true).onClose(() -> uncheckRun(cr::close));

关于java - 是否可以从仅公开迭代器的 "readNext"部分的对象创建流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24458961/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com