- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个聚合在 KTable 上的拓扑。这是我创建的通用方法,用于根据我拥有的不同主题构建此拓扑。
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时则不然。
在这两种情况下,当我获得更新时,首先调用减法器
,然后调用加法器
。问题在于,使用 TopologyTestDriver
时,会发送两条消息进行更新:一条消息在 subtractor
调用之后,另一条消息在 adder
调用之后称呼。更不用说在subrtractor
之后和adder
之前发送的消息处于不正确的阶段。
还有人可以确认这是一个错误吗?我已经针对 Kafka 2.0.1 和 2.1.0 版本进行了测试。
编辑:
我在github中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase
最佳答案
有两条输出记录(一条“减”记录和一条“加”记录)是预期行为。理解它的工作原理有点困难,所以让我尝试解释一下。
假设您有以下输入表:
key | value
-----+---------
A | <10,2>
B | <10,3>
C | <11,4>
关于 KTable#groupBy()
您提取值的第一部分作为新键(即 10
或 11
),然后在聚合中对第二部分(即 2
、 3
、 4
)求和。因为A
和B
记录都有 10
作为新 key ,您将求和 2+3
你还会总结 4
换新 key 11
。结果表将是:
key | value
-----+---------
10 | 5
11 | 4
现在假设更新记录<B,<11,5>>
将原来的输入KTable修改为:
key | value
-----+---------
A | <10,2>
B | <11,5>
C | <11,4>
因此,新的结果表应该总结 5+4
对于 11
和2
对于 10
:
key | value
-----+---------
10 | 2
11 | 9
如果将第一个结果表与第二个结果表进行比较,您可能会注意到两行都已更新。老B|<10,3>
记录从 10|5
中减去导致 10|2
和新的B|<11,5>
记录已添加至11|4
导致 11|9
.
这正是您看到的两条输出记录。第一个输出记录(执行减法后)更新第一行(它减去不再属于聚合结果的旧值),而第二个记录将新值添加到聚合结果中。在我们的示例中,减法记录将为 <10,<null,<10,3>>>
添加记录将是 <11,<<11,5>,null>>
(这些记录的格式为<key, <plus,minus>>
(注意减法记录仅设置minus
部分,而加法记录仅设置plus
部分)。
最后一点:不能将正负记录放在一起,因为正负记录的键可能不同(在我们的示例中 11
和 10
),因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条同时包含加号和减号部分的记录。
关于apache-kafka - TopologyTestDriver 在 KTable 聚合上发送错误消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54372134/
我有一个 Cassandra 集群,里面有 4 个表和数据。 我想使用聚合函数(sum,max ...)发出请求,但我在这里读到这是不可能的: http://www.datastax.com/docu
我有以下两张表 Table: items ID | TITLE 249 | One 250 | Two 251 | Three 我投票给这些: Table: votes VID | IID | u
这个问题在这里已经有了答案: Update MongoDB field using value of another field (12 个答案) 关闭 3 年前。 我想根据另一个“源”集合的文档中
我的收藏包含以下文件。我想使用聚合来计算里面有多少客户,但我遇到了一些问题。我可以获得总行数,但不能获得总(唯一)客户。 [{ _id: "n001", channel: "Kalip
我有下表 Id Letter 1001 A 1001 H 1001 H 1001 H 1001 B 1001 H 1001 H 1001
得到一列的表 ABC。 “创建”的日期列。所以样本值就像; created 2009-06-18 13:56:00 2009-06-18 12:56:00 2009-06-17 14:02:0
我有一个带有数组字段的集合: {[ name:String buyPrice:Int sellPrice:Int ]} 我试图找到最低和最高买入/卖出价格。在某些条目中,买入或卖出价格为零
我有以下问题: 在我的 mongo db 中,我有以下结构: { "instanceId": "12", "eventId": "0-1b", "activityType":
下面给出的是我要在其上触发聚合查询的 Elasticsearch 文档。 { "id": 1, "attributes": [ { "fieldId": 1,
我正在使用 Django 的 aggregate query expression总计一些值。最终值是一个除法表达式,有时可能以零作为分母。如果是这种情况,我需要一种方法来逃避,以便它只返回 0。 我
我正在学习核心数据,特别是聚合。 当前我想要做的事情:计算表中在某些条件上具有逆关系的多对关系的记录数。 目前我正在这样做: NSExpression *ex = [NSExpression expr
我需要有关 Delphi 中的 ClientDatasets 的一些帮助。 我想要实现的是一个显示客户的网格,其中一列显示每个客户的订单数量。我将 ClientDataset 放在表单上并从 Delp
我的集合有 10M 个文档,并且有一个名为 movieId 的字段;该文档具有以下结构: { "_id" : ObjectId("589bed43e3d78e89bfd9b779"), "us
这个问题已经有答案了: What is the difference between association, aggregation and composition? (21 个回答) 已关闭 9
我在 elasticsearch 中有一些类似于这些示例的文档: { "id": ">", "list": [ "a", "b", "c" ] } { "id"
我正在做一些聚合。但是结果完全不是我所期望的,似乎它们没有聚合索引中与我的查询匹配的所有文档,在这种情况下 - 它有什么好处? 例如,首先我做这个查询: {"index":"datalayer","t
假设我在 ES 中有这些数据。 | KEY | value | |:-----------|------------:| | A |
可能在我的文档中,我有一个被分析的文本字段。我只是在ElasticSearch AggregationAPI中迷路了。我需要2种不同情况的支持: 情况A)结果是带有计数标记(条款)的篮子下降。 情况B
我正在为网上商店构建多面过滤功能,如下所示: Filter on Brand: [ ] LG (10) [ ] Apple (5) [ ] HTC (3) Filter on OS: [ ] Andr
我有一个父/子关系并且正在搜索 child 。 是否可以在父属性上创建聚合? 例如parent 是 POST,children 是 COMMENT。如果父项具有“类别”属性,是否可以搜索 COMMEN
我是一名优秀的程序员,十分优秀!