gpt4 book ai didi

apache-kafka-streams - 卡夫卡流 API : KStream to KTable

转载 作者:行者123 更新时间:2023-12-03 08:58:15 26 4
gpt4 key购买 nike

我有一个 Kafka 主题,我在其中发送位置事件(key=user_id,value=user_location)。我可以将其作为 KStream 读取和处理:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});

这很好用,但我想要一个 KTable每个用户的最后一个已知位置。我怎么能做到?

我能够做到读写中间主题:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

是否有一种简单的方法可以获取 KTable来自 KStream ?这是我第一个使用 Kafka Streams 的应用程序,所以我可能遗漏了一些明显的东西。

最佳答案

更新:

在 Kafka 2.5 中,一种新方法 KStream#toTable()将被添加,这将提供一种方便的方法来转换 KStreamKTable .详情见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

原答案:

目前没有直接的方法来做到这一点。如 Confluent 常见问题解答中所述,您的方法绝对有效:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.



有一种选择,使用“虚拟减少”:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
"dummy-aggregation-store");

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.



总的来说,你需要自己决定,你更喜欢哪种方法:

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.

关于apache-kafka-streams - 卡夫卡流 API : KStream to KTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42937057/

26 4 0