gpt4 book ai didi

apache-kafka-streams - 如何在 Kafka 流中使用 HashMap 作为值创建状态存储?

转载 作者:行者123 更新时间:2023-12-04 18:30:44 24 4
gpt4 key购买 nike

我需要使用 String 键 HashMap 作为值创建一个状态存储。我尝试了以下两种方法。

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();

代码编译正常,没有任何错误,但出现运行时错误
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

有人可以建议我创建状态商店的正确方法是什么吗?

最佳答案

如果要创建状态存储,则需要为要使用的类型提供序列化器和反序列化器类。在 Kafka Stream 中,有一个名为 的抽象。塞尔德 将序列化器和反序列化器包装在一个类中。

如果您使用 .withValues(Class<K> keyClass)它必须认为

@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes



因为没有内置 SerdesHashMap您需要先实现一个(可能称为 HashMapSerde )并将此类提供给方法 .withValues(Serde<K> keySerde) .此外,您必须为 HashMap 实现实际的序列化器和反序列化器。 , 也。如果你知道你的 HashMap 的泛型类型,你应该指定它们(这使得序列化器和反序列化器的实现更简单。

像这样的东西(只是一个草图;省略了泛型类型):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}

void close() {
/* put your code here */
}

Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}

public byte[] serialize(String topic, T data) {
/* put your code here */
}

public void close() {
/* put your code here */
}
};
}

Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}

public T deserialize(String topic, byte[] data) {
/* put your code here */
}

public void close() {
/* put your code here */
}
};
}
}

如果您想查看有关如何实现(反)序列化程序和 Serde 的示例,看看 https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serializationhttps://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

关于apache-kafka-streams - 如何在 Kafka 流中使用 HashMap 作为值创建状态存储?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39199301/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com