gpt4 book ai didi

apache-kafka - TopologyTestDriver 在 KTable 聚合上发送错误消息

转载 作者:行者123 更新时间:2023-12-02 03:22:29 25 4
gpt4 key购买 nike

我有一个聚合在 KTable 上的拓扑。这是我创建的通用方法,用于根据我拥有的不同主题构建此拓扑。

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}

这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时则不然。

在这两种情况下,当我获得更新时,首先调用减法器,然后调用加法器。问题在于,使用 TopologyTestDriver 时,会发送两条消息进行更新:一条消息在 subtractor 调用之后,另一条消息在 adder 调用之后称呼。更不用说在subrtractor之后和adder之前发送的消息处于不正确的阶段。

还有人可以确认这是一个错误吗?我已经针对 Kafka 2.0.1 和 2.1.0 版本进行了测试。

编辑:
我在github中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase

最佳答案

有两条输出记录(一条“减”记录和一条“加”记录)是预期行为。理解它的工作原理有点困难,所以让我尝试解释一下。

假设您有以下输入表:

 key |  value
-----+---------
A | <10,2>
B | <10,3>
C | <11,4>

关于 KTable#groupBy()您提取值的第一部分作为新键(即 1011 ),然后在聚合中对第二部分(即 234 )求和。因为AB记录都有 10作为新 key ,您将求和 2+3你还会总结 4换新 key 11 。结果表将是:

 key |  value
-----+---------
10 | 5
11 | 4

现在假设更新记录<B,<11,5>>将原来的输入KTable修改为:

 key |  value
-----+---------
A | <10,2>
B | <11,5>
C | <11,4>

因此,新的结果表应该总结 5+4对于 112对于 10 :

 key |  value
-----+---------
10 | 2
11 | 9

如果将第一个结果表与第二个结果表进行比较,您可能会注意到两行都已更新。老B|<10,3>记录从 10|5 中减去导致 10|2和新的B|<11,5>记录已添加至11|4导致 11|9 .

这正是您看到的两条输出记录。第一个输出记录(执行减法后)更新第一行(它减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中。在我们的示例中,减法记录将为 <10,<null,<10,3>>>添加记录将是 <11,<<11,5>,null>> (这些记录的格式为<key, <plus,minus>>(注意减法记录仅设置minus部分,而加法记录仅设置plus部分)。

最后一点:不能将正负记录放在一起,因为正负记录的键可能不同(在我们的示例中 1110 ),因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条同时包含加号和减号部分的记录。

关于apache-kafka - TopologyTestDriver 在 KTable 聚合上发送错误消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54372134/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com