gpt4 book ai didi

java - java中累积流

转载 作者:太空宇宙 更新时间:2023-11-04 13:07:55 25 4
gpt4 key购买 nike

最近我一直在尝试将我的数据解析器重新实现为 java 中的流,但我不知道如何做一件特定的事情:

考虑带有时间戳的对象 A。考虑由各种 A 对象组成的对象 B考虑一些可以告诉我们对象 B 的时间范围的指标。

我现在拥有的是一些带有状态的方法,它会遍历对象 A 的列表,如果它适合最后一个对象 B,它就会去那里,否则它会创建新的 B 实例并开始将对象 A 放在那里。

我想以流方式执行此操作

获取对象 A 的整个列表并将其作为流。现在我需要找出创建“ block ”并将它们累积到对象 B 中的函数。我该怎么做?
谢谢

编辑:

A 和 B 很复杂,但我会尝试在这里发布一些简化版本。

class A {
private final long time;
private A(long time) {
this.time = time;
}
long getTime() {
return time;
}
}

class B {
// not important, build from "full" temporaryB class
// result of accumulation
}

class TemporaryB {
private final long startingTime;
private int counter;

public TemporaryB(A a) {
this.startingTime = a.getTime();
}

boolean fits(A a) {
return a.getTime() - startingTime < THRESHOLD;
}

void add(A a) {
counter++;
}
}

class Accumulator {
private List<B> accumulatedB;
private TemporaryBParameters temporaryBParameters
public void addA(A a) {
if(temporaryBParameters.fits(a)) {
temporaryBParameters.add(a)
} else {
accumulateB.add(new B(temporaryBParameters)
temporaryBParameters = new TemporaryBParameters(a)
}
}
}

好的,这是非常简单的方法,我现在该怎么做。我不喜欢它。它很丑。

最佳答案

一般来说,此类问题不太适合 Stream API,因为您可能需要非本地知识,这使得并行处理变得更加困难。想象一下,您有 new A(1)new A(2)new A(3) 等,直到 new A(1000),且 Threshold 设置为 10。所以你基本上需要将输入按 10 个元素合并成批处理。这里我们遇到了与讨论的相同的问题in this answer :当我们将任务拆分为子任务时,后缀部分可能不知道前缀部分中有多少个元素,因此在处理整个前缀之前,它甚至无法开始将数据合并为批处理。您的问题本质上是串行的。

另一方面,new headTail 提供了一个解决方案我的方法StreamEx图书馆。该方法的并行性很差,但是有了它,您可以在短短几行内定义几乎任何操作。

以下是如何使用 headTail 解决您的问题:

static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
return input.headTail((head, tail) ->
tb == null ? combine(tail, new TemporaryB(head)) :
tb.fits(head) ? combine(tail, tb.add(head)) :
combine(tail, new TemporaryB(head)).prepend(tb),
() -> StreamEx.ofNullable(tb));
}

这里我这样修改了您的TemporaryB方法:

TemporaryB add(A a) {
counter++;
return this;
}

示例(假设阈值 = 1000):

List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));

Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);

输出(我写了简单的B.toString()):

B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]

所以这里实际上有一个 B 的惰性 Stream

<小时/>

说明:

StreamEx.headTail 参数是两个 lambda。当输入流非空时,First 最多调用一次。它接收第一个流元素(头)和包含所有其他元素的流(尾)。当输入流为空且不接收任何参数时,第二个最多调用一次。两者都应该产生一个将被使用的输出流。所以我们这里有:

return input.headTail((head, tail) ->

tb == null 是起始情况,从 head 创建新的 TemporaryB 并使用 tail 调用 self:

    tb == null ? combine(tail, new TemporaryB(head)) :

tb.fits(head) ?好的,只需将 head 添加到现有的 tb 中,并使用 tail 调用 self:

        tb.fits(head) ? combine(tail, tb.add(head)) :

否则再次创建新的TemporaryB(head),但也要在输出流前面添加当前的tb(实际上将新元素发送到目标流中):

            combine(tail, new TemporaryB(head)).prepend(tb), 

输入流已耗尽?好的,返回最后收集的tb(如果有):

    () -> StreamEx.ofNullable(tb));

请注意,headTail 实现保证了这种解决方案在递归时不会消耗超过常量的堆栈和堆。如果您有疑问,可以在数千个输入元素上进行检查:

Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);

关于java - java中累积流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34252073/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com