gpt4 book ai didi

java-8 - 并行流重复项目

转载 作者:行者123 更新时间:2023-12-02 08:54:37 25 4
gpt4 key购买 nike

我正在从数据库中检索大块数据,并使用这些数据将其写入其他地方。为了避免长时间的处理时间,我尝试使用并行流来编写它。
当我将其作为顺序流运行时,它工作得很好。但是,如果我将其更改为并行,则行为会很奇怪:它会多次打印同一个对象(超过 10 次)。

@PostConstruct
public void retrieveAllTypeRecords() throws SQLException {
logger.info("Retrieve batch of Type records.");
try {
Stream<TypeRecord> typeQueryAsStream = jdbcStream.getTypeQueryAsStream();
typeQueryAsStream.forEach((type) -> {
logger.info("Printing Type with field1: {} and field2: {}.", type.getField1(), type.getField2()); //the same object gets printed here multiple times
//write this object somewhere else
});
logger.info("Completed full retrieval of Type data.");
} catch (Exception e) {
logger.error("error: " + e);
}
}

public Stream<TypeRecord> getTypeQueryAsStream() throws SQLException {
String sql = typeRepository.getQueryAllTypesRecords(); //retrieves SQL query in String format

TypeMapper typeMapper = new TypeMapper();

JdbcStream.StreamableQuery query = jdbcStream.streamableQuery(sql);
Stream<TypeRecord> stream = query.stream()
.map(row -> {
return typeMapper.mapRow(row); //maps columns values to object values
});
return stream;
}


public class StreamableQuery implements Closeable {

(...)

public Stream<SqlRow> stream() throws SQLException {
final SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery());
final SqlRow sqlRow = new SqlRowAdapter(rowSet);

Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {
@Override
public boolean hasNext() {
return !rowSet.isLast();
}

@Override
public SqlRow next() {
if (!rowSet.next()) {
throw new NoSuchElementException();
}
return sqlRow;
}
}, Spliterator.CONCURRENT);
return StreamSupport.stream(supplier, Spliterator.CONCURRENT, true); //this boolean sets the stream as parallel
}
}

我也尝试过使用 typeQueryAsStream.parallel().forEach((type) 但结果是相同的。

输出示例:
[ForkJoinPool.commonPool-worker-1] INFO TypeService - 保存类型,字段 1:L6797 和字段 2:P1433。
[ForkJoinPool.commonPool-worker-1] INFO TypeService - 保存类型,字段 1:L6797 和字段 2:P1433。
[主要] INFO TypeService - 保存类型,字段 1:L6797 和字段 2:P1433。
[ForkJoinPool.commonPool-worker-1] INFO TypeService - 保存类型,字段 1:L6797 和字段 2:P1433。

最佳答案

好吧,看看你的代码,

    final SqlRow sqlRow = new SqlRowAdapter(rowSet);

Supplier<Spliterator<SqlRow>> supplier = () -> Spliterators.spliteratorUnknownSize(new Iterator<SqlRow>() {

@Override
public SqlRow next() {
if (!rowSet.next()) {
throw new NoSuchElementException();
}
return sqlRow;
}
}, Spliterator.CONCURRENT);

每次都返回相同的对象。您可以通过在调用rowSet.next()时隐式修改该对象的状态来达到您想要的效果。 .

当多个线程尝试同时访问该单个对象时,这显然不起作用。即使缓冲一些项目,将它们交给另一个线程也会引起麻烦。因此,一旦涉及有状态中间操作,这种干扰也会导致顺序流出现问题,例如 sorteddistinct .

假设typeMapper.mapRow(row)会产生一个实际的数据项,该数据项不会干扰其他数据项,您应该将此步骤集成到流源中,以创建一个有效的流。

public Stream<TypeRecord> stream(TypeMapper typeMapper) throws SQLException {
SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(preparedStatement.executeQuery());
SqlRow sqlRow = new SqlRowAdapter(rowSet);

Spliterator<TypeRecord> sp = new Spliterators.AbstractSpliterator<TypeRecord>(
Long.MAX_VALUE, Spliterator.CONCURRENT|Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super TypeRecord> action) {
if(!rowSet.next()) return false;
action.accept(typeMapper.mapRow(sqlRow));
return true;
}
};
return StreamSupport.stream(sp, true); //this boolean sets the stream as parallel
}

请注意,对于很多用例(例如本例),实现 Spliterator比实现 Iterator 更简单(无论如何都需要通过 spliteratorUnknownSize 进行包装)。此外,无需将此实例化封装到 Supplier 中。 .

最后一点,当前的实现对于大小未知的流表现不佳,因为它处理 Long.MAX_VALUE就像一个非常大的数字,忽略规范分配给它的“未知”语义。提供一个估计的大小对并行性能非常有利,事实上,在当前的实现中,它不需要精确,甚至是一个完全虚构的数字,比如 1000可能比正确使用 Long.MAX_VALUE 表现更好表示完全未知的大小。

关于java-8 - 并行流重复项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44161841/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com