gpt4 book ai didi

java - 为 Kafka 流上的窗口数据创建 SerDes

转载 作者:行者123 更新时间:2023-12-02 09:54:01 25 4
gpt4 key购买 nike

我在创建一个 SerDes 来与我正在聚合的数据一起使用时遇到一些麻烦,并且需要通过“.to()”发送到另一个主题,但是,我需要为窗口数据创建一个 SerDes,而且我不知道该怎么做。

最佳答案

我们可以通过以下方式为窗口数据创建 Serilizer 和 DeSerilizer。

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);

WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);

下面给出了窗口数据中 Serilizer/DeSerilizer 的使用。

KStream<String,StockTransaction> transactionKStream =  kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");

transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
.through(stringSerde, transactionSerde,"stocks-out")
.groupBy((k,v) -> k, stringSerde, transactionSerde)
.aggregate(StockTransactionCollector::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
TimeWindows.of(10000),
collectorSerde, "stock-summaries")
.to(windowedSerde,collectorSerde,"transaction-summary");

我建议您阅读以下内容以获取更多信息。

https://www.programcreek.com/java-api-examples/index.php?api=org.apache.kafka.streams.kstream.internals.WindowedSerializer

关于java - 为 Kafka 流上的窗口数据创建 SerDes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56135796/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com