gpt4 book ai didi

java - 热衷于在 Java 中使用 Kafka Streams 创建差异流?

转载 作者:行者123 更新时间:2023-12-02 03:35:33 25 4
gpt4 key购买 nike

我正在尝试从 Kafka Java 中的 KStream 创建“差异”流。

我有一个输入流,其中的值是一组 double V0 … Vn。输出流应计算 V0 - 0、V1 - V0、V2 - V1 … Vn -Vn-1 之间的差。

我的第一个想法是做这样的事情:

    KStream<String, Double> stream = builder.stream(TOPIC)

KTable<String, Double> difference = stream.groupByKey().reduce(
(oldValue, newValue) -> {
return newValue - oldValue
}
).toStream()

假设我有一个具有以下值的 KStream 输入:

Key  -> Value
"A1" -> 2
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13
"A1" -> 7

我想使用以下值创建一个新的流输出:

Key  -> Value
"A1" -> 2 (2-0 = 2)
"B2" -> 4 (4-0 = 4)
"A1" -> 4 (6-2 = 4)
"A1" -> 4 (10-6 = 4)
"B2" -> 9 (13-4 = 9)
"A1" -> -3 (7-10 = -3)

最佳答案

你可以使用类似的东西

        stream.groupByKey().aggregate(Diff::new, new Aggregator<String, Double, Diff>() {

@Override
public Diff apply(String key, Double newValue, Diff aggregate) {
Double difference = newValue - aggregate.getLastValue();
aggregate.setDifference(difference);
aggregate.setLastValue(newValue);
return aggregate;
}
}).mapValues(new ValueMapper<Diff, Double>() {

@Override
public Double apply(Diff value) {
return value.getDifference();
}

}).toStream().to("diff");

哪里

public class Diff {

private Double lastValue = 0d;

private Double difference = 0d;
//getters and setters
// ...
}

关于java - 热衷于在 Java 中使用 Kafka Streams 创建差异流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56864385/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com