gpt4 book ai didi

java - RxJava- Group、Emit 和 Zip Sorted "Chunks"具有共同属性?

转载 作者:搜寻专家 更新时间:2023-11-01 03:20:37 28 4
gpt4 key购买 nike

我和我的同事经常遇到一个挑战,我希望响应式编程能够解决它。它可能需要我自己实现 OperatorTransformer尽管。

我想拿任何Observable<T>发射 T项,但我希望运算符(operator)将它们分组到 T 的映射中并将每个分组作为 List<T> 发出,甚至更好的一些通用累加器很像 Collector来自 Java 8 流。

但这是棘手的部分,我认为这不是 groupBy()可以做。我想通过这个运算符获取两个 Observables,并假设发出的项目在该属性上排序(传入数据将从排序的 SQL 查询发出并映射到 T 对象)。运算符将连续累积项目,直到属性更改,然后它发出该组并移动到下一个。这样我就可以从每个 Observable 中获取每组数据,压缩并处理这两个 block ,然后将它们扔掉并继续下一个。这样我就可以保持半缓冲状态并保持低内存使用率。

因此,如果我在 PARTITION_ID 上进行排序、分组和压缩,这在视觉上就是我想要完成的。

enter image description here

我这样做只是因为我可能有两个查询,每个查询都超过一百万条记录,而且我需要并排进行复杂的比较。我没有足够的内存来一次从两侧导入所有数据,但我可以将其范围缩小到每个排序的属性值并将其分成几批。在每批之后,GC 将丢弃它,Operator 可以继续处理下一批。

这是我目前的代码,但我不太清楚如何继续,因为我不想在批处理完成之前发出任何东西。我该怎么做?

public final class  SortedPartitioner<T,P,C,R> implements Transformer<T,R> {

private final Function<T,P> mappedPartitionProperty;
private final Supplier<C> acculatorSupplier;
private final BiConsumer<T,R> accumulator;
private final Function<C,R> finalResult;


private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
this.mappedPartitionProperty = mappedPartitionProperty;
this.acculatorSupplier = acculatorSupplier;
this.accumulator = accumulator;
this.finalResult = finalResult;
}
public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
Function<T,P> mappedPartitionProperty,
Supplier<C> accumulatorSupplier,
BiConsumer<T,R> accumulator,
Function<C,R> finalResult) {

return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);

}
@Override
public Observable<R> call(Observable<T> t) {
return null;
}

}

最佳答案

您的另一个答案使用 Maven Central 上的库并且更短。

将此依赖项添加到您的 pom.xml

<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.5.13</version>
</dependency>

在对具有相同 partition_id 的项目进行分组方面,请执行以下操作:

import com.github.davidmoten.rx.Transformers;

Observable<List<Item>> grouped = items.compose(
Transformers.toListWhile(
(list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

此方法的测试非常全面(另请参阅 Transformers.collectWhile 了解列表以外的数据结构),但您可以在 github 上自行查看源代码.

然后继续zip

关于java - RxJava- Group、Emit 和 Zip Sorted "Chunks"具有共同属性?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31523884/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com