gpt4 book ai didi

java - Kafka Streams 分组和串联

转载 作者:行者123 更新时间:2023-11-29 08:23:22 25 4
gpt4 key购买 nike

我有一个接收记录的 Kafka 流,我想根据特定字段连接消息。

流中的消息如下所示:

Key: 2099
Payload{
email: tom@emample.com
eventCode: 2099
}

预期输出:

key: 2099
Payload{
emails: tom@example, bill@acme.com, jane@example.com
}

我可以让流正常运行,我只是不确定 lamda 应该包含什么。

这是我到目前为止所做的。我不确定我是否应该使用 map、aggregate 或 reduce 或这些操作的组合。

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);

inputStream
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))

// Not sure what to do here …..

}).to (OUTPUT_TOPIC );

最佳答案

可能是这样的

inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
.aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {
@Override
public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {
result.setKey(key);
if(result.getEmails()==null){
result.setEmails(newValue.getEmail());
}else{
result.setEmails(result.getEmails() + "," + newValue.getEmail());
}
return result;
}
}, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);

关于java - Kafka Streams 分组和串联,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55497032/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com