gpt4 book ai didi

java - Kafka KStream——拓扑设计

转载 作者:行者123 更新时间:2023-11-30 07:59:29 25 4
gpt4 key购买 nike

我的流是键/值对,我想将其作为“原始”并通过 60 秒聚合保存到数据库中。最初我是这样做的:

                       ->foreach
/
kStreamBuilder.stream->aggregateBy->process

后来我发现

一个。 .aggregateby() 只返回它匹配的对(我需要所有的 - 匹配或其他)
b.我可以在 .process() 阶段使用 HashMap 实现相同的聚合效果。然后,当调用 .punctuate() 时,我将所有 k/v 对写入数据库。

因此生成的拓扑变为:

kStreamBuilder.stream->foreach
kStreamBuilder.stream->进程

问题:

  1. 这是获得所有 kv 对匹配或其他匹配结果的“合理”方法吗? (所有值通过 foreach 和任何对 + 其余通过 process)
  2. 在将原始流发送到 .foreach().process() 之前,我是否需要(以某种方式)划分原始流,或者是否足以执行上述操作?

最佳答案

DSL层的聚合是为“增量聚合”而设计的,即当前聚合结果加上要“添加”的单个新值。如果要一次访问 60 秒窗口的所有“原始记录”,则需要使用 Processor API。

如果您有两个下游运营商,则无需执行任何操作。记录将自动转发给双方。但是,请记住,它们不会被复制,即,对于每条记录,两个下游运算符(operator)都会看到相同的 Java 对象!

关于java - Kafka KStream——拓扑设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39319939/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com