gpt4 book ai didi

apache-kafka-streams - 如何反转多对多关系?

转载 作者:行者123 更新时间:2023-12-04 01:56:54 24 4
gpt4 key购买 nike

我有一个压缩的 Kafka 主题,它是一个实体流,在我想要反转的多对多关系中具有该实体的最新表示。

一个例子是 Author 对象的主题,其中主题键是 Author.id (AAA),值是“Book”标识符值的数组:

"AAA" -> {"books": [456]}

Author 编写 ID 为 333 的新 Book 时,具有相同键的新事件将写入流中更新后的书单:

"AAA" -> {"books": [456, 333]}

Book 也有可能有多个 Authors,因此相同的 Book 标识符可能会出现在另一个事件中:

"BBB" -> {"books": [333, 555]}

我想使用 kafka 流将其反转为 Books -> [Author] 流,因此上述事件将导致如下结果:

456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}

当我再次启动我的应用程序时,我希望恢复状态,这样如果我读入另一个 Author 记录,它会适本地反转关系。所以这个:

"CCC" -> {"books": [555]}

会知道 "BBB" 也是一个 Author 并且会发出更新的事件:

555 -> {"authors": ["BBB", "CCC"]}

我一直在关注 GlobalKTable,它在本地读取完整的主题状态,但无法弄清楚如何让它反转关系并将值聚合在一起。

如果可以的话,我想我可以将 GlobalKTable 与事件流结合起来,并获得每本 Book Author 的完整列表.

最佳答案

您不必使用 GlobakKTable 来实现您的要求。在 Kafka Streams 中,由更改 key 引起的内部数据重新分配会自动发生。例如:

orgKStream
.flatMapValues(books -> getBookList) (1)
.map((k,v) -> new KeyValue<>(v, k)) (2)
.groupByKey() (3)
.aggregate(//aggregate author list ) (4)
.toStream(// sink topic) (5)

(1) 将像下面这样更改您的原始主题。

<before>
"AAA" -> {"books": [456, 333]}
"BBB" -> {"books": [333, 555]}
<after>
"AAA" -> 456
"AAA" -> 333
"BBB" -> 333
"BBB" -> 555

(2) 将用值替换键。

<after>
456 -> "AAA"
333 -> "AAA"
333 -> "BBB"
555 -> "BBB"

(3) 和(4) 将聚合并生成 KTable(和状态存储)

<after>
456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}

(5) 将表中的全部记录写入给定主题。

现在,您有一个新主题,其中包含作为键的书籍和作为值的作者列表。如果您想将整个结果放在一个地方,现在只需像下面这样创建 GlobalKTable。

StreamsBuilder.globalTable(<sink topic>)

如果 (2) 被调用 (map),然后 (3) 被调用 (groupByKey),将发生通过重新分区主题的内部数据重新分配。这意味着所有具有相同图书 ID 作为键的记录将被发布到内部重新分区主题的相同分区中。因此,您不会丢失任何聚合数据。

关于apache-kafka-streams - 如何反转多对多关系?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49972593/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com