gpt4 book ai didi

Java Streams - 缓冲巨大的流

转载 作者:行者123 更新时间:2023-12-01 11:47:40 25 4
gpt4 key购买 nike

我试图将多个由大量数据支持的流合并为一个,然后缓冲它们。我可以毫无问题地将这些流折叠成一个项目流。但是,当我尝试缓冲/分块流时,它会尝试完全缓冲第一个流,这会立即填满我的内存。

我花了一段时间将问题缩小到最小测试用例,但下面有一些代码。

我可以重构一些东西,这样我就不会遇到这个问题,但是在不理解为什么会爆炸的情况下,我觉得使用流只是一个定时炸弹。

我从 Buffer Operator on Java 8 Streams 那里获得了缓冲的灵感。

import java.util.*;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class BreakStreams
{

//@see https://stackoverflow.com/questions/47842871/buffer-operator-on-java-8-streams
/**
* Batch a stream into chunks
*/
public static <T> Stream<List<T>> buffer(Stream<T> stream, final long count)
{
final Iterator<T> streamIterator = stream.iterator();

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<List<T>>()
{
@Override public boolean hasNext()
{
return streamIterator.hasNext();
}

@Override public List<T> next()
{
List<T> intermediate = new ArrayList<>();
for (long v = 0; v < count && hasNext(); v++)
{
intermediate.add(streamIterator.next());
}
return intermediate;
}
}, 0), false);
}

public static void main(String[] args)
{

//create streams from huge datasets
Stream<Long> streams = Stream.of(LongStream.range(0, Integer.MAX_VALUE).boxed(),
LongStream.range(0, Integer.MAX_VALUE).boxed())
//collapse into one stream
.flatMap(x -> x);
//iterating over the stream one item at a time is OK..
// streams.forEach(x -> {

//buffering the stream is NOT ok, you will go OOM
buffer(streams, 25).forEach(x -> {
try
{
Thread.sleep(2500);
}
catch (InterruptedException ignore)
{
}
System.out.println(x);
});
}
}

最佳答案

这似乎与旧问题“Why filter() after flatMap() is "not completely" lazy in Java streams? ”有关。虽然该问题已针对 Stream 的内置操作修复,但当我们尝试从外部迭代平面映射流时,它似乎仍然存在。

我们可以简化代码来重现问题

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.iterator().hasNext();

请注意,使用 Spliterator 也会受到影响
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.spliterator()
.tryAdvance((long l) -> System.out.println("first item: "+l));

两者都尝试缓冲元素,直到最终使用 OutOfMemoryError 退出。

由于 spliterator().forEachRemaining(…) 似乎没有受到影响,您可以实现一个适用于您的 forEach 用例的解决方案,但它会很脆弱,因为它仍然会出现短路流操作的问题。
public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator<T> source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<T>>(
(source.estimateSize()+count-1)/count, source.characteristics()
&(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List<T> list;
Consumer<T> c = t -> list.add(t);
@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
if(list == null) list = new ArrayList<>(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() < count && source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer<? super List<T>> action) {
source.forEachRemaining(t -> {
if(list == null) list = new ArrayList<>(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}

但请注意,基于 Spliterator 的解决方案通常更可取,因为它们支持携带支持优化的附加信息,并且在许多用例中具有较低的迭代成本。因此,一旦在 JDK 代码中修复了此问题,这就是要走的路。

作为一种解决方法,您可以使用 Stream.concat(…) 来组合流,但它有一个明确的警告,即不要在 its documentation 中一次组合太多流:

Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException [sic].



throwable 的名称已在 Java 9 的文档中更正为 StackOverflowError

关于Java Streams - 缓冲巨大的流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61114380/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com