gpt4 book ai didi

java - 加入 Kafka Streams 如何获取原始记录

转载 作者:行者123 更新时间:2023-11-29 04:09:56 24 4
gpt4 key购买 nike

我有一个正在运行的 Kafka Streams 应用程序,它目前正在从两个不同的主题创建两个 KStreams。那部分工作得很好。

现在,我想加入它们,并获得第一个值和第二个值的“聚合记录”。键是简单的 Java 字符串,值是 avro 编码的 GenericRecords。

根据文档,我应该能够做这样的事情:

    KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> { ??? }
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);

但是,我在网上找到的文档或教程并不清楚我在上面的 { ??? } 部分中可以做什么.我已经尝试了上述的多种变体,但没有运气。如果重要的话,我正在使用 Kakfa Streams 2.2.0 版本。

我只是想要一个 <key, merge value1 + value2> 的输出流,对于使用相同 key 在两个流上出现的记录。我可以手动合并值,但不清楚如何访问 lambda 右侧的值。

最佳答案

在 ValueJoiner (left, right) -> { ??? } , left 表示左流的值,right 表示右流的值

您所要做的就是在 ValueJoiner 中添加您的代码,如下所示:

import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;

KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> {
// You can get access to the generic Avro record by
// casting both left and right values
Record leftRecord = (Record) left;
Record rightRecord = (Record) right;

// For the original question, you can simply create a new GenericRecord
// with the contents of left and right records
GenericRecord record = new GenericData.Record(schema);
record.put("left", left);
record.put("right", right);
}
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);

关于java - 加入 Kafka Streams 如何获取原始记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55697220/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com