gpt4 book ai didi

apache-kafka - 如何让 Kafka Streams 每 1 小时窗口每个键发送一条记录?

转载 作者:行者123 更新时间:2023-12-04 17:35:25 25 4
gpt4 key购买 nike

我正在编写 Kafka Streams 应用程序。它执行以下步骤"1)消费输入数据2) 在 1 小时窗口内根据新 key 对记录进行重复数据删除3)重新选择 key 4) 计算 1 小时窗口内的 key 5) 发送给下游。

我是 Kafka Streams 的新手。我的理解是,为了将窗口保持为 1 小时,我将 commit.interval.ms 也设置为 1 小时。这是正确的做法吗?

一旦我用真实流量部署我的应用程序,该应用程序似乎一直在发送消息,而我认为它每小时只会发送一堆消息?

感谢任何帮助!!

我的配置:

commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000
cache.max.bytes.buffering = 10485760

// dedupe by new key per window(1hr)
stream = inputStream
.selectKey(... )
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
// only keep the latest event for each customized key
.reduce((event1, event2) -> event2)
.toStream()
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
.reduce((event1, event2) -> {
long count1 = event1.getCount();
long count2 = event2.getCount();
event2.setCount(count1 + count2);
return event2;
})
.toStream()
.to(OUTPUT_TOPIC);

最佳答案

I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?

提交间隔与你的处理逻辑无关。

您可能想查看 suppress() 运算符。此外,以下 block 帖子可能会有所帮助:

Kafka Streams 的处理模型是连续的,它默认发送连续的结果更新。这就是为什么基本上每个输入消息都会得到一个输出消息,因为处理输入消息会修改结果。

关于apache-kafka - 如何让 Kafka Streams 每 1 小时窗口每个键发送一条记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56840580/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com