gpt4 book ai didi

java-8 - 为什么带有副作用的过滤器比基于Spliterator的实现方式更好?

转载 作者:行者123 更新时间:2023-12-04 12:28:24 26 4
gpt4 key购买 nike

关于How to skip even lines of a Stream obtained from the Files.lines问题,我遵循了公认的答案方法,基于filterEven()接口(interface)实现了自己的Spliterator<T>方法,例如:

public static <T> Stream<T> filterEven(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res, false);
}

我可以通过以下方式使用:
Stream<DomainObject> res = Files.lines(src)
filterEven(res)
.map(line -> toDomainObject(line))

但是,相对于使用带有副作用的 filter()的下一种方法来衡量这种方法的性能,我注意到下一种方法的效果更好:
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isEvenLine ())
.map(line -> toDomainObject(line))

我使用JMH测试了性能,但没有在基准测试中包含文件负载。我以前将其加载到数组中。然后,每个基准测试都从先前的数组创建 Stream<String>开始,然后过滤偶数行,然后应用 mapToInt()提取 int字段的值,最后是 max()操作。这是基准测试之一(您可以检查整个 Program here,在这里您有 data file with about 186 lines):
@Benchmark
public int maxTempFilterEven(DataSource src){
Stream<String> content = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1); // Skip line: Not available
return filterEven(content) // Filter daily info and skip hourly
.mapToInt(line -> parseInt(line.substring(14, 16)))
.max()
.getAsInt();
}

我不明白为什么 filter()方法具有比 filterEven()(〜50ops/ms)更好的性能(〜80ops/ms)?

最佳答案

简介

我想我知道原因,但是不幸的是,我不知道如何提高基于Spliterator的解决方案的性能(至少在不重写整个Streams API功能的情况下)。

旁注1 :设计Stream API时,性能并不是最重要的设计目标。如果性能至关重要,则最有可能在不使用Stream API的情况下重新编写代码将使代码更快。 (例如,Stream API不可避免地会增加内存分配,从而增加GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能下降为代价提供了更好的高级API。

部分 1 理论上的简短回答
Stream旨在实现一种内部迭代作为消耗的主要方式,而外部迭代(即基于Spliterator的方式)是一种“模拟”的附加手段。因此,外部迭代会涉及一些开销。懒惰对外部迭代的效率增加了一些限制,并且由于需要支持flatMap,因此有必要在此过程中使用某种动态缓冲区。

旁注2 在某些情况下,基于Spliterator的迭代可能与内部迭代一样快(在这种情况下为filter)。在直接从包含数据的Spliterator直接创建Stream的情况下尤其如此。要查看它,可以修改测试以将第一个过滤器具体化为String s数组:

String[] filteredData = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1)
.toArray(String[]::new);

然后比较 maxTempFiltermaxTempFilterEven的性能,以接受该预先过滤的 String[] filteredData。如果您想知道为什么会这样,您可能应该阅读本长答案的其余部分,或者至少阅读第2部分。

部分 2 更长的理论答案:

流的设计主要是通过某些终端操作将其整体消耗掉。尽管被支持,但一个接一个的迭代元素并不是设计使用流的主要方式。

请注意,使用“功能性” Stream API(例如 mapflatMapfilterreducecollect),您不能在某个步骤中说“我已经有足够的数据,不再遍历源并推送值”。您可以丢弃某些传入数据(就像 filter一样),但不能停止迭代。 ( takeskip转换实际上是在内部使用 Spliterator来实现的; anyMatchallMatchnoneMatchfindFirstfindAny等使用非公共(public)API j.u.s.Sink.cancellationRequested,它们也更容易使用,因为不能进行多个终端操作。)如果管道中的所有转换都是同步的,则可以将它们组合为一个聚合函数( Consumer),然后在一个简单的循环中调用它(可以选择将循环执行拆分为多个线程)。这就是我简化的基于状态的过滤器所代表的含义(请参见 中的代码,向我显示一些代码部分)。如果管道中有一个 flatMap,它会变得更加复杂,但是思路仍然相同。

基于 Spliterator的转换从根本上有所不同,因为它向管道增加了异步的,由消费者驱动的步骤。现在, Spliterator而不是源 Stream驱动了迭代过程。如果您直接在源 Spliterator上请求 Stream,它也许可以返回一些实现,该实现只需对其内部数据结构进行迭代,这就是为什么实现预先过滤的数据应消除性能差异的原因。但是,如果您为某些非空管道创建 Spliterator,则除了要求源将元素逐一推送通过管道,直到某个元素通过所有过滤器之外,没有其他(简单)的选择(另请参见示例2中的第二个示例)。 ,向我显示一些部分代码)。源元素被逐个而不是分批推送的事实是使 Stream变得懒惰的基本决定的结果。需要缓冲区而不是仅一个元素是对 flatMap的支持的结果:从源中推送一个元素可以为 Spliterator生成许多元素。

部分 3 向我显示一些代码

本部分试图为“理论”部分中所描述的代码提供支持(包括链接到实际代码和模拟代码)。

首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个惰性管道中(请参阅 j.u.s.AbstractPipeline及其子类(例如 j.u.s.ReferencePipeline)。然后,当应用了终端操作时,原始操作中的所有元素 Stream通过管道“推送”。

您看到的是两件事的结果:
  • 在您遇到的情况下,流管道不同的事实
    里面有一个基于Spliterator的步骤。
  • 您的OddLines不是
  • 管道中的第一步

    具有状态过滤器的代码或多或少类似于以下简单代码:
    static int similarToFilter(String[] data)
    {
    final int[] counter = {0};
    final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
    int skip = 1;

    boolean reduceEmpty = true;
    int reduceState = 0;
    for (String outerEl : data)
    {
    if (outerEl.charAt(0) != '#')
    {
    if (skip > 0)
    skip--;
    else
    {
    if (isEvenLine.test(outerEl))
    {
    int intEl = parseInt(outerEl.substring(14, 16));
    if (reduceEmpty)
    {
    reduceState = intEl;
    reduceEmpty = false;
    }
    else
    {
    reduceState = Math.max(reduceState, intEl);
    }
    }
    }
    }
    }
    return reduceState;
    }

    请注意,这实际上是一个内部包含一些计算(过滤/转换)的循环。

    另一方面,当您在管道中添加 Spliterator时,情况会发生很大变化,即使使用与实际发生的情况基本相似的简化代码,它也会变得更大,例如:
    interface Sp<T>
    {
    public boolean tryAdvance(Consumer<? super T> action);
    }

    static class ArraySp<T> implements Sp<T>
    {
    private final T[] array;
    private int pos;

    public ArraySp(T[] array)
    {
    this.array = array;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
    if (pos < array.length)
    {
    action.accept(array[pos]);
    pos++;
    return true;
    }
    else
    {
    return false;
    }
    }
    }

    static class WrappingSp<T> implements Sp<T>, Consumer<T>
    {
    private final Sp<T> sourceSp;
    private final Predicate<T> filter;

    private final ArrayList<T> buffer = new ArrayList<T>();
    private int pos;


    public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
    {
    this.sourceSp = sourceSp;
    this.filter = filter;
    }

    @Override
    public void accept(T t)
    {
    buffer.add(t);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
    while (true)
    {
    if (pos >= buffer.size())
    {
    pos = 0;
    buffer.clear();
    sourceSp.tryAdvance(this);
    }
    // failed to fill buffer
    if (buffer.size() == 0)
    return false;

    T nextElem = buffer.get(pos);
    pos++;
    if (filter.test(nextElem))
    {
    action.accept(nextElem);
    return true;
    }
    }
    }
    }

    static class OddLineSp<T> implements Sp<T>, Consumer<T>
    {
    private Sp<T> sourceSp;

    public OddLineSp(Sp<T> sourceSp)
    {
    this.sourceSp = sourceSp;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
    if (sourceSp == null)
    return false;

    sourceSp.tryAdvance(this);
    if (!sourceSp.tryAdvance(action))
    {
    sourceSp = null;
    }
    return true;

    }

    @Override
    public void accept(T t)
    {

    }
    }

    static class ReduceIntMax
    {
    boolean reduceEmpty = true;
    int reduceState = 0;

    public int getReduceState()
    {
    return reduceState;
    }

    public void accept(int t)
    {
    if (reduceEmpty)
    {
    reduceEmpty = false;
    reduceState = t;
    }
    else
    {
    reduceState = Math.max(reduceState, t);
    }
    }
    }


    static int similarToSpliterator(String[] data)
    {
    ArraySp<String> src = new ArraySp<>(data);

    int[] skip = new int[1];
    skip[0] = 1;
    WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
    {
    if (s.charAt(0) == '#')
    return false;
    if (skip[0] != 0)
    {
    skip[0]--;
    return false;
    }
    return true;
    });
    OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
    final ReduceIntMax reduceIntMax = new ReduceIntMax();
    while (oddLines.tryAdvance(s ->
    {
    int intValue = parseInt(s.substring(14, 16));
    reduceIntMax.accept(intValue);
    })) ; // do nothing in the loop body
    return reduceIntMax.getReduceState();
    }

    此代码较大,因为如果在循环内没有一些非平凡的有状态回调,则无法(或至少很难)表示逻辑。这里的 Sp接口(interface)是 j.u.s.Streamj.u.Spliterator接口(interface)的混合体。
  • ArraySp表示Arrays.stream的结果。
  • WrappingSpj.u.s.StreamSpliterators.WrappingSpliterator相似,后者在实际代码中表示针对任何非空管道的Spliterator接口(interface)的实现,即,至少应用了一个中间操作的Stream(请参阅j.u.s.AbstractPipeline.spliterator method)。在我的代码中,我将其与StatelessOp子类合并,并放置负责filter方法实现的逻辑。同样为了简单起见,我使用skip实现了filter
  • OddLineSp对应于您的OddLines及其产生的Stream
  • ReduceIntMax表示ReduceOpsMath.max终端操作int

  • 那么在此示例中重要的是什么?这里重要的是,因为您首先过滤了原始流,所以 OddLineSp是根据非空管道(即 WrappingSp)创建的。而且,如果您仔细观察 WrappingSp,您会注意到每次 tryAdvance都被调用时,它将调用委托(delegate)给 sourceSp并将结果累积到 buffer中。而且,由于管道中没有 flatMap,因此 buffer的元素将被一一复制。 IE。每次调用 WrappingSp.tryAdvance时,它将调用 ArraySp.tryAdvance,准确地返回一个元素(通过回调),并将其进一步传递给调用方提供的 consumer(除非该元素与过滤器不匹配,在这种情况下将再次调用 ArraySp.tryAdvance)再一次,但 buffer永远不会一次填充多个元素)。

    旁注3 :如果要查看实际代码,则最有趣的地方是 j.u.s.StreamSpliterators. WrappingSpliterator.tryAdvance ,它调用
    j.u.s.StreamSpliterators. AbstractWrappingSpliterator.doAdvance 依次调用 j.u.s.StreamSpliterators. AbstractWrappingSpliterator.fillBuffer j.u.s.StreamSpliterators. pusher 依次调用在ojit_a初始化的 WrappingSpliterator.initPartialTraversalState
    因此,影响性能的主要因素是复制到缓冲区中。
    对于我们这些普通的Java开发人员来说,不幸的是,Stream API的当前实现几乎是封闭的,您不能仅使用继承或组合来修改内部行为的某些方面。
    您可以使用基于反射的黑客手段,使针对特定情况的复制到缓冲区的效率更高,并获得一定的性能(但是牺牲了 Stream的惰性),但是您不能完全避免这种复制,因此基于 Spliterator的代码将是反正比较慢

    回到第2条 旁注的示例,基于 Spliterator的包含具体化 filteredData的测试工作得更快,因为在 WrappingSp之前的管道中没有 OddLineSp,因此不会复制到中间缓冲区中。

    关于java-8 - 为什么带有副作用的过滤器比基于Spliterator的实现方式更好?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43809885/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com