gpt4 book ai didi

apache-kafka - Kafka Streams - 更新 KTable 上的聚合

转载 作者:行者123 更新时间:2023-12-04 17:44:04 26 4
gpt4 key购买 nike

我有一个 KTable 数据看起来像这样(键 => 值),其中键是客户 ID,值是包含一些客户数据的小型 JSON 对象:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想对此进行一些聚合 KTable ,并且基本上保持对每个 age_group 的记录数的计数.想要的 KTable 数据如下所示:
"18-24" => 3
"25-30" => 1

让我们说 Alice , 谁在 18-24上面的组,有一个生日,使她进入了新的年龄组。国营店后盾第一 KTable 现在应该是这样的:
1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想要结果聚合 KTable 结果反射(reflect)了这一点。例如
"18-24" => 2
"25-30" => 2

我可能过度概括了所描述的问题 here :

In Kafka Streams there is no such thing as a final aggregation... Depending on your use case, manual de-duplication would be a way to resolve the issue"



但到目前为止我只能计算一个运行总数,例如爱丽丝的生日将被解释为:
"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well

编辑:这是我注意到的一些额外行为,这似乎是出乎意料的。

我使用的拓扑如下所示:
dataKTable = builder.table("compacted-topic-1", "users-json")
.groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
.count("age-range-counts")

1) 空状态

现在,从最初的空状态开始,一切看起来像这样:
compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)

2)发送几条消息

现在,让我们向 compacted-topic-1 发送消息,作为 流式传输KTable 以上。这是发生的事情:
compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0

所以我想知道:
  • 我正在尝试使用 Kafka Streams 0.10.1 或 0.10.2 做什至可能吗?我试过使用 groupBycount在 DSL 中,但也许我需要使用类似 reduce 的东西?
  • 另外,我在理解导致 add 的情况时遇到了一些麻烦。 reducer 和subtract reducer 被调用,所以任何关于这些点的任何澄清将不胜感激。
  • 最佳答案

    如果您有原件 KTable包含 id -> Json数据(我们称之为 dataKTable )你应该能够通过

    KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
    .count("someStoreName");

    这应该适用于所有版本的 Kafka Streams API。

    更新

    关于重新分区主题中的 4 个值:这是正确的。对“base KTable”的每次更新都会为其“旧值”和“新值”写入一条记录。这是更新下游 KTable 所必需的正确。旧值必须从一个计数中删除,新值必须添加到另一个计数中。因为你的(数) KTable可能是分布式的(即,在多个并行运行的应用程序实例上共享),两个记录(旧的和新的)可能会在不同的实例中结束,因为它们可能有不同的键,因此它们必须作为两个独立的记录发送。 (不过,您在问题中显示的记录格式应该更复杂。)

    这也解释了为什么需要减法器和加法器。减法器从聚合结果中删除旧记录,而加法器将新记录添加到聚合结果中。

    仍然不确定为什么在结果中看不到正确的计数。你运行了多少实例?也许尝试禁用 KTable通过设置缓存 cache.max.bytes.buffering=0StreamsConfig .

    关于apache-kafka - Kafka Streams - 更新 KTable 上的聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42685331/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com