gpt4 book ai didi

java - KStream 批处理窗口

转载 作者:行者123 更新时间:2023-12-01 09:35:13 27 4
gpt4 key购买 nike

我想用 KStream 接口(interface)批处理消息。

我有一个带有键/值的流
我试图在一个翻滚的窗口中收集它们,然后我想立即处理整个窗口。

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
.aggregateByKey(
HashMap::new,
(aggKey, value, aggregate) -> {
aggregate.put(value.getUuid, value);
return aggregate;
},
TimeWindows.of("intentWindow", 100),
longSerde, mapSerde)
.foreach((wk, values) -> {

问题是每次更新 KTable 时都会调用 foreach。
我想在完成后处理整个窗口。就像从 100 毫秒收集数据然后立即处理一样。在每个。
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6

在某些时候,新窗口从 map 中的 1 个条目开始。
所以我什至不知道什么时候窗口是满的。

在 kafka 流中进行批处理的任何提示

最佳答案

我的实际任务是将更新从流推送到 redis,但我不想单独读取/更新/写入,即使 redis 很快。
我现在的解决方案是使用 KStream.process() 提供一个处理器,该处理器添加到进程队列并实际处理队列中的标点。

public class BatchedProcessor extends AbstractProcessor{

...
BatchedProcessor(Writer writer, long schedulePeriodic)

@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(schedulePeriodic);
}

@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
writer.processQueue();
context().commit();
}

@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
writer.addToQueue(intentUpdateEvent);
}

我仍然需要测试,但它解决了我遇到的问题。人们可以很容易地以一种非常通用的方式编写这样的处理器。 API 非常整洁干净,但是一个 processBatched((List batchedMessaages)-> ..., timeInterval OR countInterval) 只使用标点符号来处理批处理并在此时提交并在 Store 中收集 KeyValues 可能是一个有用的补充。

但也许它的目的是使用处理器来解决这个问题,并将 API 保持在一条消息中,同时关注低延迟。

关于java - KStream 批处理窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39104352/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com