gpt4 book ai didi

java - KafkaStreams 如何在流聚合中指定Serdes?

转载 作者:行者123 更新时间:2023-12-04 05:14:20 26 4
gpt4 key购买 nike

我正在开发 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。

我有一个 KStream bankTransactions其中键的类型为 String和类型 JsonNode 的值所以我配置了我的应用程序的 Serdes

// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());

我想聚合 KTable<String, Long> 中的值其中键将相同但值将是 Long s 从我的 Json 中提取。

所以首先我写道:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);

我在运行时收到以下错误:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我知道 Kafka 正在提示,因为我正在尝试使用默认的 Json serdes 来序列化 Long .所以阅读 confluent's doc我试过这个
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);

但是我在编译时遇到错误:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>

我尝试了不同的方式来编写这段代码(例如使用 Serdes.long() 而不是我的 longSerdes ,试图参数化 Materialize 的类型,甚至试图将我的初始化器和聚合器编写为函数,Java 7 风格)但我不能不知道我做错了什么。

所以我的问题很简单:如何正确指定 aggregate 的 Serdes当它们不是默认的 Serdes 时应该使用吗?

最佳答案

似乎正确的语法如下:

KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
Materialize.之后的三种类型是键、值和用于具体化 KTable 的存储之一,这个不应该改变。然后我们可以定义用于写入这个键值存储的 Serdes。

备注 我从 github 上的一个随机存储库中得到了这个语法,我仍然很乐意接受一个由一些文档支持的更精确答案的答案。

关于java - KafkaStreams 如何在流聚合中指定Serdes?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53283232/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com